1. Introduction
1.绪论
Ratpack is a framework built on top of the Netty engine, which allows us to quickly build HTTP applications. We’ve already covered its basic usage in previous articles. This time, we’ll show how to use its streaming API to implement reactive applications.
Ratpack是一个建立在Netty引擎之上的框架,它允许我们快速构建HTTP应用程序。我们已经在以前的文章中介绍了它的基本用法。这一次,我们将展示如何使用其流式API来实现反应式应用程序。
2. A Quick Recap on Reactive Streams
2.关于反应式流的快速回顾
Before getting into the actual implementation, let’s first do a quick recap on what constitutes a Reactive Application. According to the original authors, such applications must have the following properties:
在进入实际实现之前,让我们首先快速回顾一下什么是反应式应用程序。根据原作者的说法,此类应用程序必须具备以下属性。
- Responsive
- Resilient
- Elastic
- Message Driven
So, how do Reactive Streams help us achieve any of those properties? Well, in this context, message-driven doesn’t necessarily imply the use of messaging middleware. Instead, what is actually required to address this point is asynchronous request processing and support for non-blocking backpressure.
那么,反应式流是如何帮助我们实现这些特性的呢?那么,在这种情况下,消息驱动并不一定意味着使用消息传递中间件。相反,解决这一点实际上需要的是异步请求处理和对非阻塞反压的支持。
Ratpack reactive support uses the Reactive Streams API standard for the JVM as the base for its implementation. As such, it allows interoperability with other compatible frameworks like Project Reactor and RxJava.
Ratpack的反应式支持使用JVM的Reactive Streams API标准作为其实现的基础。因此,它允许与Project Reactor和RxJava等其他兼容框架进行互操作。
3. Using Ratpacks’ Streams Class
3.使用Ratpacks的Streams类
Ratpack’s Streams class provides several utility methods to create Publisher instances, which we can then use to create data processing pipelines.
Ratpack的Streams类提供了几个实用方法来创建Publisher实例,然后我们可以用它来创建数据处理管道。。
A good starting point is the publish() method, which we can use to create a Publisher from any Iterable:
一个好的起点是publish()方法,我们可以用它来从任何Iterable创建一个Publisher。
Publisher<String> pub = Streams.publish(Arrays.asList("hello", "hello again"));
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
Here, LoggingSubscriber is a test implementation of the Subscriber interface that just logs every object emitted by the Publisher. It also includes a helper method block() that, as the name suggests, blocks the caller until the publisher emits all its objects or produces an error.
这里,LoggingSubscriber是Subscriber接口的一个测试实现,它只是记录由Publisher发出的每个对象。它还包括一个辅助方法block(),顾名思义,该方法会阻止调用者,直到发布者发出所有对象或产生错误。
Running the test case, we’ll see the expected sequence of events:
运行测试案例,我们会看到预期的事件序列。
onSubscribe: sub=7311908
onNext: sub=7311908, value=hello
onNext: sub=7311908, value=hello again
onComplete: sub=7311908
Another useful method is yield(). It has a single Function parameter that receives a YieldRequest object and returns the next object to emit:
另一个有用的方法是yield()。它有一个单一的Function参数,接收一个YieldRequest对象并返回下一个要发射的对象。
@Test
public void whenYield_thenSuccess() {
Publisher<String> pub = Streams.yield((t) -> {
return t.getRequestNum() < 5 ? "hello" : null;
});
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
assertEquals(5, sub.getReceived());
}
The YieldRequest parameter allows us to implement logic based on the number of objects emitted so far, using its getRequestNum() method. In our example, we use this information to define the end condition, which we signal by returning a null value.
YieldRequest参数允许我们使用其getRequestNum()方法,根据到目前为止发射的对象的数量来实现逻辑。在我们的例子中,我们使用这个信息来定义结束条件,我们通过返回一个null值来发出信号。
Now, let’s see how to create a Publisher for periodic events:
现在,让我们看看如何为定期事件创建一个Publisher。
@Test
public void whenPeriodic_thenSuccess() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Publisher<String> pub = Streams.periodically(executor, Duration.ofSeconds(1), (t) -> {
return t < 5 ? String.format("hello %d",t): null;
});
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
assertEquals(5, sub.getReceived());
}
The returned publisher uses the ScheduledExecutorService to call the producer function periodically until it returns a null value. The producer function receives an integer value that corresponds to the number of objects already emitted, which we use to terminate the stream.
返回的发布者使用ScheduledExecutorService来定期调用生产者函数,直到它返回一个null值。生产者函数收到一个整数值,对应于已经发射的对象的数量,我们用它来终止流。
4. Using TransformablePublisher
4.使用TransformablePublisher
Taking a closer look at Streams’ methods, we can see that they usually return a TransformablePublisher. This interface extends Publisher with several utility methods that, much like what we find in Project Reactor’s Flux and Mono, make it easier to create complex processing pipelines from individual steps.
仔细看看Streams的方法,我们可以看到它们通常会返回一个TransformablePublisher。这个接口扩展了Publisher的几个实用方法,就像我们在Project Reactor的Flux和Mono中发现的那样,使我们更容易从单个步骤创建复杂的处理管道。
As an example, let’s use the map method to transform a sequence of integers into strings:
作为一个例子,让我们使用map方法来将一串整数转化为字符串。
@Test
public void whenMap_thenSuccess() throws Exception {
TransformablePublisher<String> pub = Streams.yield( t -> {
return t.getRequestNum() < 5 ? t.getRequestNum() : null;
})
.map(v -> String.format("item %d", v));
ExecResult<List<String>> result = ExecHarness.yieldSingle((c) -> pub.toList());
assertTrue("should succeed", result.isSuccess());
assertEquals("should have 5 items",5,result.getValue().size());
}
Here, the actual execution happens inside a thread pool managed by the test utility class ExecHarness. Since yieldSingle() expects a Promise, we use toList() to adapt our publisher. This method collects all results produced by the subscriber and stores them in a List.
在这里,实际的执行发生在一个由测试工具类ExecHarness管理的线程池内。由于yieldSingle()期望一个Promise,我们使用toList()来调整我们的发布器。这个方法收集了由订阅者产生的所有结果,并将它们存储在一个List中。
As stated in the documentation, we must take care when using this method. Applying it to an unbounded publisher can quickly make the JVM running out of memory! To avoid this situation, we should keep its use mostly restricted to unit tests.
正如文档中所说,我们在使用这个方法时必须小心。将其应用于一个无界的发布者,会很快使JVM的内存耗尽!为了避免这种情况,我们应该把它的使用主要限制在单元测试中。
Besides map(), TransformablePublisher has several useful operators:
除了map(),TransformablePublisher有几个有用的操作符。
- filter(): filter upstream objects based on a Predicate
- take(): emits just the first n objects from the upstream Publisher
- wiretap(): adds an observation point where we can inspect data and events as they flow through the pipeline
- reduce(): reduce upstream objects to a single value
- transform(): injects a regular Publisher in the stream
5. Using buffer() with Non-Compliant Publishers
5.使用buffer()与不符合规定的出版商
In some scenarios, we must deal with a Publisher that sends more items to their subscribers than requested. To address those scenarios, Ratpack’s Streams offer a buffer() method, which keeps those extra items in memory until subscribers consume them.
在某些情况下,我们必须处理一个Publisher向他们的订阅者发送的项目比要求的多。为了解决这些情况,Ratpack的Streams提供了一个buffer()方法,它将这些额外的项目保留在内存中,直到订阅者消费它们。
To illustrate how this works, let’s create a simple non-compliant Publisher that ignored the number of requested items. Instead, it will always produce at least 5 items more than requested:
为了说明这一点,让我们创建一个简单的不合规的Publisher,它忽略了要求的项目的数量。相反,它将永远比要求的项目多生产至少5个。
private class NonCompliantPublisher implements Publisher<Integer> {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
log.info("subscribe");
subscriber.onSubscribe(new NonCompliantSubscription(subscriber));
}
private class NonCompliantSubscription implements Subscription {
private Subscriber<? super Integer> subscriber;
private int recurseLevel = 0;
public NonCompliantSubscription(Subscriber<? super Integer> subscriber) {
this.subscriber = subscriber;
}
@Override
public void request(long n) {
log.info("request: n={}", n);
if ( recurseLevel > 0 ) {
return;
}
recurseLevel++;
for (int i = 0 ; i < (n + 5) ; i ++ ) {
subscriber.onNext(i);
}
subscriber.onComplete();
}
@Override
public void cancel() {
}
}
}
First, let’s test this publisher using our LoggingSubscriber. We’ll use the take() operator so it will receive just the first item
首先,让我们使用我们的LoggingSubscriber来测试这个发布器。我们将使用take()操作符,所以它将只接收第一个项目
@Test
public void whenNonCompliantPublisherWithoutBuffer_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
.wiretap(new LoggingAction(""))
.take(1);
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
Running this test, we see that despite receiving a cancel() request, our non-compliant publisher keeps producing new items:
运行这个测试,我们看到,尽管收到了cancel()请求,但我们的不合规的发布者仍在生产新项目:。
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=583189145, value=0
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - : event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... more expurious data event
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145
Now, let’s add a buffer() step in this stream. We’ll add two wiretap steps to log events before it, so its effect becomes more apparent:
现在,让我们在这个流中添加一个buffer()步骤。我们将在它之前添加两个wiretap步骤来记录事件,这样它的效果就会更加明显。
@Test
public void whenNonCompliantPublisherWithBuffer_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
.wiretap(new LoggingAction("before buffer"))
.buffer()
.wiretap(new LoggingAction("after buffer"))
.take(1);
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
This time, running this code produces a different log sequence:
这一次,运行这段代码产生了不同的日志序列。
LoggingSubscriber - onSubscribe: sub=675852144
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - subscribe
RatpackStreamsUnitTest - before buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - before buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... more data events
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=675852144, value=0
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=67585214
The “before buffer” messages show that our non-compliant publisher was able to send all values after the first call to request. Nevertheless, downstream values were still sent one by one, respecting the amount requested by the LoggingSubscriber.
“缓冲区前 “信息显示,我们的不合规发布者能够在第一次调用request后发送所有值。尽管如此,下游的值仍然被一个一个地发送,尊重LoggingSubscriber所要求的数量。
6. Using batch() with Slow Subscribers
6.使用batch()与慢速订阅者的关系
Another scenario that can decrease an application’s throughput is when downstream subscribers request data in small amounts. Our LoggingSubscriber is a good example: it requests just a single item at a time.
另一种可能降低应用程序吞吐量的情况是,当下游订阅者少量请求数据时。我们的LoggingSubscriber就是一个很好的例子:它一次只请求一个项目。
In real-world applications, this can lead to a lot of context switches, which will hurt the overall performance. A better approach is to request a larger number of items at a time. The batch() method allows an upstream publisher to use more efficient request sizes while allowing downstream subscribers to use smaller request sizes.
在现实世界的应用中,这可能会导致大量的上下文切换,这将损害整体性能。更好的方法是一次请求大量的项目。batch()方法允许上游发布者使用更有效的请求大小,同时允许下游订阅者使用更小的请求大小。
Let’s see how this works in practice. As before, we’ll start with a stream without batch:
让我们看看这在实践中是如何运作的。像以前一样,我们将从一个没有batch的流开始。
@Test
public void whenCompliantPublisherWithoutBatch_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
.wiretap(new LoggingAction(""));
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
Here, CompliantPublisher is just a test Publisher that produces integers up to, but excluding, the value passed to the constructor. Let’s run it to see the non-batched behavior:
这里,CompliantPublisher只是一个测试Publisher,它产生整数,但不包括传递给构造函数的值。让我们运行它来看看非批处理的行为。
CompliantPublisher - subscribe
LoggingSubscriber - onSubscribe: sub=-779393331
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - request: requested=1, available=10
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-779393331, value=0
... more data events omitted
CompliantPublisher - request: requested=1, available=1
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=-779393331
The output shows that the producer emits values one by one. Now, let’s add step batch() to our pipeline, so the upstream publisher produces up to five items at a time:
输出显示,生产者一个接一个地发出了数值。现在,让我们把步骤batch()添加到我们的管道中,这样上游发布者一次最多生产五个项目。
@Test
public void whenCompliantPublisherWithBatch_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
.wiretap(new LoggingAction("before batch"))
.batch(5, Action.noop())
.wiretap(new LoggingAction("after batch"));
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
The batch() method takes two arguments: the number of items requested on each request() call and an Action to handle discarded items, that is, items requested but not consumed. This situation can arise if there’s an error or a downstream subscriber calls cancel(). Let’s see the resulting execution log:
batch() 方法需要两个参数:每次request()调用时请求的项目数量和一个Action来处理丢弃的项目,也就是请求但没有消耗的项目。如果出现错误或下游订阅者调用cancel(),就会出现这种情况。让我们看看结果的执行日志。
LoggingSubscriber - onSubscribe: sub=-1936924690
RatpackStreamsUnitTest - after batch: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - subscribe
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=10
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... first batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=6
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=5}]
... second batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=1
RatpackStreamsUnitTest - before batch: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after batch: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-1936924690, value=0
RatpackStreamsUnitTest - after batch: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after batch: event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... downstream data events omitted
LoggingSubscriber - onComplete: sub=-1936924690
We can see that now the publisher gets requests for five items each time. Notice that, in this test scenario, we see two requests to the producer even before the logging subscriber gets the first item. The reason is that, in this test scenario, we have a single-threaded execution, so batch() continues to buffer items until it gets the onComplete() signal.
我们可以看到,现在发布者每次都会得到五个项目的请求。注意,在这个测试场景中,我们看到两个对生产者的请求,甚至在日志订阅者得到第一个项目之前。原因是,在这个测试场景中,我们有一个单线程的执行,所以batch()继续缓冲项目,直到它得到onComplete()信号。
7. Using Streams in Web Applications
7.在网络应用中使用流
Ratpack supports using reactive streams in conjunction with its asynchronous web framework.
Ratpack支持在其异步网络框架中使用反应式流。
7.1. Receiving a Data Stream
7.1.接收一个数据流
For incoming data, the Request object available through the handler’s Context provides the getBodyStream() method, which returns a TransformablePublisher of ByteBuf objects.
对于传入的数据,通过处理程序的Context可用的Request对象提供了getBodyStream()方法,它返回一个TransformablePublisher的ByteBuf对象。
From this publisher, we can build our processing pipeline:
从这个发布者,我们可以建立我们的处理管道。
@Bean
public Action<Chain> uploadFile() {
return chain -> chain.post("upload", ctx -> {
TransformablePublisher<? extends ByteBuf> pub = ctx.getRequest().getBodyStream();
pub.subscribe(new Subscriber<ByteBuf>() {
private Subscription sub;
@Override
public void onSubscribe(Subscription sub) {
this.sub = sub;
sub.request(1);
}
@Override
public void onNext(ByteBuf t) {
try {
... do something useful with received data
sub.request(1);
}
finally {
// DO NOT FORGET to RELEASE !
t.release();
}
}
@Override
public void onError(Throwable t) {
ctx.getResponse().status(500);
}
@Override
public void onComplete() {
ctx.getResponse().status(202);
}
});
});
}
There are a couple of details to consider when implementing the subscribers. First, we must ensure that we call ByteBuf’s release() method at some point. Failing to do so will lead to memory leakage. Second, any asynchronous processing must use Ratpack’s primitives only. Those include Promise, Blocking, and similar constructs.
在实现订阅者时,有几个细节需要考虑。首先,我们必须确保在某个时刻调用ByteBuf的release()方法。如果不这样做,将导致内存泄漏。其次,任何异步处理必须只使用Ratpack的原语。这包括Promise, Blocking,和类似的结构。
7.2. Sending a Data Stream
7.2.发送一个数据流
The most direct way to send data stream is to use Response.sendStream(). This method takes a ByteBuf publisher argument and sends data to the client, applying backpressure as required to avoid overflowing it:
发送数据流的最直接方法是使用Response.sendStream()。这个方法需要一个ByteBuf发布者参数,并向客户端发送数据,根据需要应用背压以避免溢出。
@Bean
public Action<Chain> download() {
return chain -> chain.get("download", ctx -> {
ctx.getResponse().sendStream(new RandomBytesPublisher(1024,512));
});
}
As simple as it is, there’s a downside when using this method: it won’t set by itself any header, including Content-Length, which might be an issue for clients:
尽管它很简单,但使用这种方法时也有一个缺点。它不会自己设置任何头,包括Content-Length,这对客户来说可能是个问题:。
$ curl -v --output data.bin http://localhost:5050/download
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
... download progress messages omitted
Alternatively, a better method is to use the handle’s Context render() method, passing a ResponseChunks object. In this case, the response will use the “chunked’ transfer encoding method. The most straightforward way to create a ResponseChunks instance is through one of the static methods available in this class:
另外,一个更好的方法是使用句柄的Context render()方法,传递一个ResponseChunks对象。在这种情况下,响应将使用”chunked‘传输编码方法。创建ResponseChunks实例的最直接的方法是通过这个类中的一个静态方法。
@Bean
public Action<Chain> downloadChunks() {
return chain -> chain.get("downloadChunks", ctx -> {
ctx.render(ResponseChunks.bufferChunks("application/octetstream",
new RandomBytesPublisher(1024,512)));
});
}
With this change, the response now includes the content-type header:
有了这个变化,响应现在包括content-type头。
$ curl -v --output data.bin http://localhost:5050/downloadChunks
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< content-type: application/octetstream
<
... progress messages omitted
7.3. Using Server-Side Events
7.3.使用服务器端事件
Support for Server-Side Events (SSE) also uses the render() method. In this case, however, we use ServerSentEvents to adapt items coming from a Producer to Event objects that include some metadata along with an event payload:
对服务器端事件(SSE)的支持也使用render()方法。然而,在这种情况下,我们使用ServerSentEvents来将来自Producer的项目调整为Event对象,其中包括一些元数据以及事件有效载荷。
@Bean
public Action<Chain> quotes() {
ServerSentEvents sse = ServerSentEvents.serverSentEvents(quotesService.newTicker(), (evt) -> {
evt
.id(Long.toString(idSeq.incrementAndGet()))
.event("quote")
.data( q -> q.toString());
});
return chain -> chain.get("quotes", ctx -> ctx.render(sse));
}
Here, QuotesService is just a sample service that creates a Publisher that produces random quotes at regular intervals. The second argument is a function that prepares the event for sending. This includes adding an id, an event type, and the payload itself.
这里,QuotesService只是一个样本服务,它创建了一个Publisher,在固定的时间段产生随机报价。第二个参数是一个为发送事件做准备的函数。这包括添加一个id、一个事件类型和有效载荷本身。
We can use curl to test this method, yielding an output that shows a sequence of random quotes, along with event metadata:
我们可以使用curl来测试这个方法,产生一个输出,显示一连串的随机引语,以及事件元数据。
$ curl -v http://localhost:5050/quotes
... request messages omitted
< HTTP/1.1 200 OK
< content-type: text/event-stream;charset=UTF-8
< transfer-encoding: chunked
... other response headers omitted
id: 10
event: quote
data: Quote [ts=2021-10-11T01:20:52.081Z, symbol=ORCL, value=53.0]
... more quotes
7.4. Broadcasting Websocket Data
7.4.广播Websocket数据
We can pipe data from any Publisher to a WebSocket connection using Websockets.websocketBroadcast():
我们可以使用Websockets.websocketBroadcast()/a>将数据从任何Publisher管道到WebSocket连接:。
@Bean
public Action<Chain> quotesWS() {
Publisher<String> pub = Streams.transformable(quotesService.newTicker())
.map(Quote::toString);
return chain -> chain.get("quotes-ws", ctx -> WebSockets.websocketBroadcast(ctx, pub));
}
Here, we use the same QuotesService we’ve seen before as the event source for broadcasting quotes to clients. Let’s use curl again to simulate a WebSocket client:
在这里,我们使用我们之前见过的同样的QuotesService作为事件源,向客户广播报价。让我们再次使用curl来模拟一个WebSocket客户端。
$ curl --include -v \
--no-buffer \
--header "Connection: Upgrade" \
--header "Upgrade: websocket" \
--header "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" \
--header "Sec-WebSocket-Version: 13" \
http://localhost:5050/quotes-ws
... request messages omitted
< HTTP/1.1 101 Switching Protocols
HTTP/1.1 101 Switching Protocols
< upgrade: websocket
upgrade: websocket
< connection: upgrade
connection: upgrade
< sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=
sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=
<
<Quote [ts=2021-10-11T01:39:42.915Z, symbol=ORCL, value=63.0]
... more quotes omitted
8. Conclusion
8.结语
In this article, we’ve explored Ratpack’s support for reactive streams and how to apply it in different scenarios.
在这篇文章中,我们已经探讨了Ratpack对反应式流的支持,以及如何在不同的场景中应用它。
As usual, the full source code of the examples can be found over on GitHub.
像往常一样,这些例子的完整源代码可以在GitHub上找到。