Example of Vertx and RxJava Integration – Vertx和RxJava集成的例子

最后修改: 2017年 9月 10日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

RxJava is a popular library for creating asynchronous and event-based programs, it takes inspiration from the main ideas brought forward by the Reactive Extensions initiative.

RxJava是一个用于创建异步和基于事件的程序的流行库,它从Reactive Extensions计划带来的主要想法中获得了灵感。

Vert.x, a project under Eclipse‘s umbrella, offers several components designed from the ground up to fully leverage the reactive paradigm.

Vert.xEclipse旗下的一个项目,它提供了几个从头设计的组件,以充分利用反应式范式。

Used together, they could prove to be a valid foundation for any Java program that needs to be reactive.

它们一起使用,可以证明是任何需要反应性的Java程序的有效基础。

In this article, we’ll load a file with a list of city names and we’ll print out, for each of them, how long is a day, from sunrise to sunset.

在这篇文章中,我们将加载一个包含城市名称列表的文件,并为每个城市打印出一天的时间,从日出到日落的时间。

We’ll use data published from the public www.metaweather.com REST API – to calculate the length of the daylight and RxJava with Vert.x to do it in a purely reactive way.

我们将使用从公共www.metaweather.comREST API发布的数据–来计算日光的长度,并使用RxJavaVert.x以纯反应的方式进行计算。

2. Maven Dependency

2.Maven的依赖性

Let’s start by importing vertx-rx-java2:

让我们从导入vertx-rx-java2开始。

<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-rx-java2</artifactId>
    <version>3.5.0-Beta1</version>
</dependency>

At the time of writing, the integration between Vert.x and the newer RxJava 2 is available only as a beta release, that is, however, stable enough for the program we’re building.

在撰写本文时,Vert.x和较新的RxJava 2之间的整合仅作为一个测试版,不过,对于我们正在构建的程序来说,这已经足够稳定了。

Note that io.vertx:vertx-rx-java2 depends on io.reactivex.rxjava2:rxjava so there’s no need to import any RxJava related package explicitly.

注意io.vertx:vertx-rx-java2依赖于io.reactivex.rxjava2:rxjava,所以不需要明确导入任何RxJava相关包。

The latest version of the Vert.x integration with RxJava can be found on Maven Central.

最新版本的Vert.xRxJava的集成可以在Maven Central上找到。

3. Set Up

3.设置

As in every application that uses Vert.x, we’ll start creating the vertx object, the main entry point to all Vert.x features:

如同每个使用Vert.x的应用程序,我们将开始创建vertx对象,这是所有Vert.x功能的主要入口。

Vertx vertx = io.vertx.reactivex.core.Vertx.vertx();

The vertx-rx-java2 library provides two classes: io.vertx.core.Vertx and io.vertx.reactivex.core.Vertx. While the first is the usual entry point for applications that are uniquely based on Vert.x, the latter is the one we’ve to use to get the integration with RxJava.

vertx-rx-java2库提供两个类。io.vertx.core.Vertxio.vertx.reactivex.core.Vertx。前者是基于Vert.x的应用程序的通常入口,而后者则是我们用来与RxJava整合的。

We go on defining objects we’ll use later:

我们继续定义我们以后要使用的对象。

FileSystem fileSystem = vertx.fileSystem();
HttpClient httpClient = vertx.createHttpClient();

Vert.x‘s FileSystem gives access to the file system in a reactive way, while Vert.x‘s HttpClient does the same for HTTP.

Vert.xFileSystem以反应式的方式提供对文件系统的访问,而Vert.xHttpClientHTTP也是如此。

4. Reactive Chain

4.反应链

It’s easy in a reactive context to concatenate several simpler reactive operators to obtain a meaningful computation.

在反应式背景下,很容易将几个较简单的反应式运算符串联起来,得到一个有意义的计算。

Let’s do that for our example:

让我们为我们的例子这样做。

fileSystem
  .rxReadFile("cities.txt").toFlowable()
  .flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))
  .flatMap(city -> searchByCityName(httpClient, city))
  .flatMap(HttpClientResponse::toFlowable)
  .map(extractingWoeid())
  .flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
  .flatMap(toBufferFlowable())
  .map(Buffer::toJsonObject)
  .map(toCityAndDayLength())
  .subscribe(System.out::println, Throwable::printStackTrace);

Let’s now explore how each logical chunk of code works.

现在让我们来探讨一下每个逻辑块的代码是如何工作的。

5. City Names

5.城市名称

The first step is to read a file containing a list of city names, one name per line:

第一步是读取一个包含城市名称列表的文件,每行一个名称。

fileSystem
 .rxReadFile("cities.txt").toFlowable()
 .flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))

The method rxReadFile() reactively reads a file and returns a RxJava‘s Single<Buffer>. So we got the integration we’re looking for: the asynchronicity of Vert.x in a data structure from RxJava.

方法 rxReadFile()反应性地读取一个文件并返回RxJavaSingle<Buffer>。因此,我们得到了我们要找的整合:Vert.x的异步性在RxJava的一个数据结构中。

There’s only one file, so we’ll get a single emission of a Buffer with the full content of the file. We convert that input into a RxJava‘s Flowable and we flat-map the lines of the file to have a Flowable that emits an event for each city name instead.

只有一个文件,所以我们会得到一个包含文件全部内容的Buffer的单一排放。我们将该输入转换为RxJavaFlowable,并对文件的行进行平面映射,从而得到一个Flowable,为每个城市名称发射一个事件。

6. JSON City Descriptor

6.JSON城市描述符

Having the city name, the next step is to use the Metaweather REST API to get the identifier code for that city. This identifier will then be used to get the sunrise and sunset times for the city. Let’s continue the chain of invocations:

有了城市名称,下一步是使用Metaweather REST API来获得该城市的识别码。然后这个标识符将被用来获取该城市的日出和日落时间。让我们继续这个调用链。

Let’s continue the chain of invocations:

让我们继续这一连串的调用。

.flatMap(city -> searchByCityName(httpClient, city))
.flatMap(HttpClientResponse::toFlowable)

The searchByCityName() method uses the HttpClient we created in the first step – to invoke the REST service that gives the identifier of a city. Then with the second flatMap(), we get the Buffer containing the response.

searchByCityName()方法使用我们在第一步创建的HttpClient–调用REST服务,提供城市的标识符。然后通过第二个flatMap(),我们得到了包含响应的Buffer

Let’s complete this step writing the searchByCityName()‘s body:

让我们完成这一步,编写searchByCityName()的主体。

Flowable<HttpClientResponse> searchByCityName(HttpClient httpClient, String cityName) {
    HttpClientRequest req = httpClient.get(
        new RequestOptions()
          .setHost("www.metaweather.com")
          .setPort(443)
          .setSsl(true)
          .setURI(format("/api/location/search/?query=%s", cityName)));
    return req
      .toFlowable()
      .doOnSubscribe(subscription -> req.end());
}

Vert.x‘s HttpClient returns a RxJava‘s Flowable that emits the reactive HTTP response. This is turn emits the body of the response split in Buffers.

Vert.xHttpClient返回一个RxJavaFlowable,它发射了反应式的HTTP响应。这又是在Buffers中分离出响应的主体。

We created a new reactive request to the proper URL but we noted that Vert.x requires the HttpClientRequest.end() method to be invoked to signaling that the request can be sent and it also requires at least one subscription before the end() could be successfully invoked.

我们向适当的URL创建了一个新的反应式请求,但我们注意到Vert.x需要调用HttpClientRequest.end()方法,以示请求可以被发送,它还要求在end()能够成功调用之前至少有一个订阅。

A solution to accomplish that is to use RxJava‘s doOnSubscribe() to invoke end() as soon as a consumer subscribes.

实现这一目标的解决方案是使用RxJavadoOnSubscribe()来在消费者订阅后立即调用end()

7. City Identifiers

7.城市标识符

We now just need to get the value of the woeid property of the returned JSON object, which uniquely identifies the city through a custom method:

我们现在只需要获得返回的JSON对象的woeid属性的值,该属性通过一个自定义方法唯一地标识了该城市。

.map(extractingWoeid())

The extractingWoeid() method returns a function that extracts the city identifier from the JSON contained in the REST service response:

extractingWoeid()方法返回一个函数,从REST服务响应中包含的JSON中提取城市标识符。

private static Function<Buffer, Long> extractingWoeid() {
    return cityBuffer -> cityBuffer
      .toJsonArray()
      .getJsonObject(0)
      .getLong("woeid");
}

Note that we can use the handy toJson…() methods provided by Buffer to quickly get access to the properties we need.

请注意,我们可以使用toJson…()方法,该方法由Buffer提供,以快速访问我们需要的属性。

8. City Details

8.城市详细信息

Let’s continue the reactive chain to retrieve the details we need from the REST API:

让我们继续反应链,从REST API检索我们需要的细节。

.flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
.flatMap(toBufferFlowable())

Let’s detail the getDataByPlaceId() method:

让我们详细介绍一下getDataByPlaceId()方法。

static Flowable<HttpClientResponse> getDataByPlaceId(
  HttpClient httpClient, long placeId) {
 
    return autoPerformingReq(
      httpClient,
      format("/api/location/%s/", placeId));
}

Here, we used the same approach we’ve put in place in the previous step. getDataByPlaceId() returns a Flowable<HttpClientResponse>. The HttpClientResponse, in turn, will emit the API response in chunks if it is longer that few bytes.

在这里,我们使用了我们在上一步中的相同方法。getDataByPlaceId()返回一个Flowable<HttpClientResponse>/em>。HttpClientResponse,反过来,如果API响应长于几个字节,它将分块发出。

With the toBufferFlowable() method we reduce the response chunks into a single one to have access to the full JSON object:

通过toBufferFlowable()方法,我们将响应块减少到一个单一的响应块中,以便访问完整的JSON对象。

static Function<HttpClientResponse, Publisher<? extends Buffer>>
  toBufferFlowable() {
    return response -> response
      .toObservable()
      .reduce(
        Buffer.buffer(),
        Buffer::appendBuffer).toFlowable();
}

9. Sunset and Sunrise Times

9.日落和日出时间

Let’s keep on adding to the reactive chain, retrieving the information we’re interested in from the JSON object:

让我们继续添加到反应链,从JSON对象中检索我们感兴趣的信息。

.map(toCityAndDayLength())

Let’s write the toCityAndDayLength() method:

我们来写一下toCityAndDayLength()方法。

static Function<JsonObject, CityAndDayLength> toCityAndDayLength() {
    return json -> {
        ZonedDateTime sunRise = ZonedDateTime.parse(json.getString("sun_rise"));
        ZonedDateTime sunSet = ZonedDateTime.parse(json.getString("sun_set"));
        String cityName = json.getString("title");
        return new CityAndDayLength(
          cityName, sunSet.toEpochSecond() - sunRise.toEpochSecond());
    };
}

It returns a function that maps the information contained in a JSON to create a POJO that simply calculates the time in hours between sunrise and sunset.

它返回一个函数,映射包含在JSON中的信息,创建一个POJO,简单地计算日出和日落之间的时间(小时)。

10. Subscription

10.认购

The reactive chain is completed. We can now subscribe to the resulting Flowable with a handler that prints out the emitted instances of CityAndDayLength, or the stack trace in case of errors:

反应链已经完成。我们现在可以用一个处理程序来订阅产生的Flowable,该处理程序可以打印出CityAndDayLength的发射实例,或者在出现错误时打印出堆栈跟踪。

.subscribe(
  System.out::println, 
  Throwable::printStackTrace)

When we run the application we could see a result like that, depending on the city contained in the list and the date in which the application runs:

当我们运行应用程序时,我们可以看到这样的结果,这取决于列表中包含的城市和应用程序的运行日期。

In Chicago there are 13.3 hours of light.
In Milan there are 13.5 hours of light.
In Cairo there are 12.9 hours of light.
In Moscow there are 14.1 hours of light.
In Santiago there are 11.3 hours of light.
In Auckland there are 11.2 hours of light.

The cities could appear in a different order than those specified in the file because all the requests to the HTTP API are executed asynchronously.

这些城市可能以不同于文件中指定的顺序出现,因为对HTTP API的所有请求都是异步执行的。

11. Conclusion

11.结论

In this article, we saw how easy it is to mix Vert.x reactive modules with the operators and logical constructs provided by RxJava.

在这篇文章中,我们看到了将Vert.x反应式模块与RxJava提供的操作符和逻辑结构混合起来是多么容易。

The reactive chain we built, albeit long, showed how it makes a complex scenario fairly easy to write.

我们建立的反应链,尽管很长,但显示了它是如何使一个复杂的场景相当容易编写的。

As always, the complete source code is available over on GitHub.

一如既往,完整的源代码可在GitHub上获得