Jetty ReactiveStreams HTTP Client – Jetty ReactiveStreams HTTP客户端

最后修改: 2019年 10月 11日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’re going to learn how to use the Reactive HTTP client from Jetty. We’ll be demonstrating its usage with different Reactive libraries by creating small test cases.

在本教程中,我们将学习如何使用Jetty的Reactive HTTP客户端。我们将通过创建小的测试案例来演示它与不同的Reactive库的使用。

2. What Is Reactive HttpClient?

2.什么是Reactive HttpClient?

Jetty’s HttpClient allows us to perform blocking HTTP requests. When we’re dealing with a Reactive API however, we can’t use the standard HTTP client. To fill this gap, Jetty has created a wrapper around the HttpClient APIs so that it also supports the ReactiveStreams API.

Jetty的HttpClient允许我们执行阻塞式HTTP请求。然而,当我们处理一个Reactive API时,我们不能使用标准的HTTP客户端。为了填补这一空白,Jetty在HttpClient API周围创建了一个封装器,这样它也支持ReactiveStreams API。

The Reactive HttpClient is used to either consume or produce a stream of data over HTTP calls.

Reactive HttpClient是用来通过HTTP调用消费或产生数据流的。

The example we’re going to demonstrate here will have a Reactive HTTP client, which will communicate with a Jetty server using different Reactive libraries. We’ll also talk about the request and response events provided by Reactive HttpClient.

我们在这里要演示的例子将有一个Reactive HTTP客户端,它将使用不同的Reactive库与Jetty服务器通信。我们还将谈论由Reactive HttpClient提供的请求和响应事件。

We recommend reading our articles on Project Reactor, RxJava, and Spring WebFlux to get a better understanding of Reactive programming concepts and its terminologies.

我们建议阅读我们关于Project ReactorRxJavaSpring WebFlux的文章,以便更好地了解反应式编程概念及其术语。

3. Maven Dependencies

3.Maven的依赖性

Let’s start the example by adding dependencies for Reactive Streams, Project Reactor, RxJava, Spring WebFlux, and Jetty’s Reactive HTTPClient to our pom.xml. Along with these, we’ll be adding the dependency of Jetty Server as well for the server creation:

让我们在示例中首先将Reactive StreamsProject ReactorRxJavaSpring WebFluxJetty 的 Reactive HTTPClient的依赖项加入我们的pom.xml。除此之外,我们还将添加Jetty Server的依赖性,以便创建服务器。

<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-reactive-httpclient</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-server</artifactId>
    <version>9.4.19.v20190610</version>
</dependency>
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.2.12.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.11</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webflux</artifactId>
    <version>5.1.9.RELEASE</version>
</dependency>

4. Creating the Server and the Client

4.创建服务器和客户端

Now let’s create a server and add a request handler that simply writes the request body to the response:

现在,让我们创建一个服务器,并添加一个请求处理程序,简单地将请求主体写入响应中。

public class RequestHandler extends AbstractHandler {
    @Override
    public void handle(String target, Request jettyRequest, HttpServletRequest request,
      HttpServletResponse response) throws IOException, ServletException {
        jettyRequest.setHandled(true);
        response.setContentType(request.getContentType());
        IO.copy(request.getInputStream(), response.getOutputStream());
    }
}

...

Server server = new Server(8080);
server.setHandler(new RequestHandler());
server.start();

And then we can write the HttpClient:

然后我们可以编写HttpClient

HttpClient httpClient = new HttpClient();
httpClient.start();

Now that we’ve created the client and server, let’s see how we can transform this blocking HTTP Client into a non-blocking one and create the request:

现在我们已经创建了客户端和服务器,让我们看看如何将这个阻塞的HTTP客户端转化为非阻塞的客户端并创建请求。

Request request = httpClient.newRequest("http://localhost:8080/"); 
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request).build();
Publisher<ReactiveResponse> publisher = reactiveRequest.response();

So here, the ReactiveRequest wrapper provided by the Jetty made our blocking HTTP Client reactive. Let’s proceed and see its usage with different reactive libraries.

所以在这里,Jetty提供的ReactiveRequest包装器使我们的阻塞式HTTP客户端成为了反应式。让我们继续看看它在不同的反应式库中的用法。

5. ReactiveStreams Usage

5.ReactiveStreams用法

Jetty’s HttpClient natively supports Reactive Streams, so let’s begin there.

Jetty的HttpClient相对支持Reactive Streams,所以我们从这里开始。

Now, Reactive Streams is just a set of interfaces, so, for our testing, let’s implement a simple blocking subscriber:

现在,Reactive Streams只是一组接口,所以,为了我们的测试,让我们实现一个简单的阻塞式订阅者。

public class BlockingSubscriber implements Subscriber<ReactiveResponse> {
    BlockingQueue<ReactiveResponse> sink = new LinkedBlockingQueue<>(1);

    @Override
    public void onSubscribe(Subscription subscription) { 
        subscription.request(1); 
    }
  
    @Override 
    public void onNext(ReactiveResponse response) { 
        sink.offer(response);
    } 
   
    @Override 
    public void onError(Throwable failure) { } 

    @Override 
    public void onComplete() { }

    public ReactiveResponse block() throws InterruptedException {
        return sink.poll(5, TimeUnit.SECONDS);
    }   
}

Note that we needed to call Subscription#request as per the JavaDoc which states that “No events will be sent by a Publisher until demand is signaled via this method.” 

请注意,我们需要调用Subscription#request ,根据JavaDoc的规定,“在通过此方法发出需求信号之前,Publisher不会发送事件。”

Also, note that we’ve added a safety mechanism so that our test can bail out if it hasn’t seen the value in 5 seconds.

另外,请注意,我们已经添加了一个安全机制,如果我们的测试在5秒钟内没有看到这个值,就可以跳出。

And now, we can quickly test our HTTP request:

现在,我们可以快速测试我们的HTTP请求。

BlockingSubscriber subscriber = new BlockingSubscriber();
publisher.subscribe(subscriber);
ReactiveResponse response = subscriber.block();
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6. Project Reactor Usage

6.项目反应器的使用

Let’s now see how we can use the Reactive HttpClient with the Project Reactor. The publisher creation is pretty much the same as in the previous section.

现在让我们看看我们如何使用Reactive HttpClient与Project Reactor。发布者的创建与上一节的内容基本相同。

After the publisher creation, let’s use the Mono class from Project Reactor to get a reactive response:

在创建发布者之后,让我们使用Project Reactor的Mono类来获得一个反应式响应。

ReactiveResponse response = Mono.from(publisher).block();

And then, we can test the resulting response:

然后,我们可以测试所产生的反应。

Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6.1. Spring WebFlux Usage

6.1.Spring WebFlux的使用方法

The conversion of the blocking HTTP Client into a reactive one is easy when used with Spring WebFlux. The Spring WebFlux ships with a reactive client, WebClient,  that can be used with various HTTP Client libraries. We can use this as an alternative to using straight Project Reactor code.

当与Spring WebFlux一起使用时,将阻塞式HTTP客户端转换为反应式客户端很容易。Spring WebFlux提供了一个反应式客户端,WebClient,可以与各种HTTP客户端库一起使用。我们可以用它来替代直接使用项目Reactor代码。

So first, let’s wrap the Jetty’s HTTP Client using JettyClientHttpConnector to bond it with the WebClient:

因此,首先,让我们使用JettyClientHttpConnector来包装Jetty的HTTP客户端,将其与WebClient:结合起来。

ClientHttpConnector clientConnector = new JettyClientHttpConnector(httpClient);

And then pass this connector to the WebClient to perform the non-blocking HTTP requests:

然后将这个连接器传递给WebClient,以执行非阻塞的HTTP请求。

WebClient client = WebClient.builder().clientConnector(clientConnector).build();

Next, let’s do the actual HTTP call with the Reactive HTTP Client that we just created and test the result:

接下来,让我们用刚刚创建的反应式HTTP客户端进行实际的HTTP调用并测试结果。

String responseContent = client.post()
  .uri("http://localhost:8080/").contentType(MediaType.TEXT_PLAIN)
  .body(BodyInserters.fromPublisher(Mono.just("Hello World!"), String.class))
  .retrieve()
  .bodyToMono(String.class)
  .block();
Assert.assertNotNull(responseContent);
Assert.assertEquals("Hello World!", responseContent);

7. RxJava2 Usage

7.RxJava2的使用

Let’s now move on and see how the Reactive HTTP client is used with RxJava2

现在让我们继续,看看Reactive HTTP客户端是如何与RxJava2一起使用的

While we’re here, let’s mutate our example just a bit to now include a body in the request:

既然我们在这里,让我们把我们的例子变通一下,现在在请求中包括一个主体。

ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request)
  .content(ReactiveRequest.Content
    .fromString("Hello World!", "text/plain", StandardCharsets.UTF_8))
  .build();
Publisher<String> publisher = reactiveRequest
  .response(ReactiveResponse.Content.asString());

The code ReactiveResponse.Content.asString() converts the response body to a string. It is also possible to discard the response using the ReactiveResponse.Content.discard() method if we’re only interested in the status of the request.

代码ReactiveResponse.Content.asString()将响应体转换为一个字符串。如果我们只对请求的状态感兴趣,也可以使用ReactiveResponse.Content.discard()方法来丢弃响应。

Now, we can see that getting a response using RxJava2 is actually quite similar to Project Reactor. Basically, we just use Single instead of Mono:

现在,我们可以看到,使用RxJava2获得响应实际上与Project Reactor很相似。基本上,我们只是用Single而不是Mono

String responseContent = Single.fromPublisher(publisher)
  .blockingGet();

Assert.assertEquals("Hello World!", responseContent);

8. Request and Response Events

8.请求和响应事件

The Reactive HTTP client emits a number of events during the execution. They are categorized as request events and response events. These events are helpful to peek into the lifecycle of a Reactive HTTP client.

反应式HTTP客户端在执行过程中会发出一系列的事件。它们被归类为请求事件和响应事件。这些事件有助于窥探Reactive HTTP客户端的生命周期。

This time, let’s make our reactive request slightly differently by using the HTTP Client instead of the request:

这一次,让我们通过使用HTTP客户端而不是请求,以稍微不同的方式提出我们的反应性请求。

ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, "http://localhost:8080/")
  .content(ReactiveRequest.Content.fromString("Hello World!", "text/plain", UTF_8))
  .build();

And now let’s get a Publisher of HTTP request events:

现在让我们得到一个HTTP请求事件的Publisher

Publisher<ReactiveRequest.Event> requestEvents = request.requestEvents();

Now, let’s use RxJava once again. This time, we’ll create a list, that holds the event types, and populate it by subscribing to the request events as they happen:

现在,让我们再一次使用RxJava。这一次,我们将创建一个持有事件类型的列表,并通过订阅发生的请求事件来填充它。

List<Type> requestEventTypes = new ArrayList<>();

Flowable.fromPublisher(requestEvents)
  .map(ReactiveRequest.Event::getType).subscribe(requestEventTypes::add);
Single<ReactiveResponse> response = Single.fromPublisher(request.response());

Then, since we’re in a test, we can block our response and verify:

然后,由于我们是在一个测试中,我们可以阻止我们的响应并进行验证。

int actualStatus = response.blockingGet().getStatus();

Assert.assertEquals(6, requestEventTypes.size());
Assert.assertEquals(HttpStatus.OK_200, actualStatus);

Similarly, we can subscribe to the response events as well. Since they are similar to the request event subscription, we’ve added only the latter here. The complete implementation with both request and response events can be found in the GitHub repository, linked at the end of this article.

同样地,我们也可以订阅响应事件。由于它们与请求事件的订阅类似,我们在这里只添加了后者。完整的请求和响应事件的实现可以在GitHub仓库中找到,链接在本文的最后。

9. Conclusion

9.结论

In this tutorial, we’ve learned about the ReactiveStreams HttpClient provided by Jetty, its usage with the various Reactive libraries and the lifecycle events associated with a Reactive request.

在本教程中,我们已经了解了Jetty提供的ReactiveStreams HttpClient,它与各种Reactive库的使用以及与Reactive请求相关的生命周期事件。

All of the code snippets, mentioned in the article, can be found in our GitHub repository.

文章中提到的所有代码片段,都可以在我们的GitHub仓库中找到。