1. Introduction
1.绪论
Spring WebFlux provides Reactive Programming to web applications. The asynchronous and non-blocking nature of Reactive design improves performance and memory usage. Project Reactor provides those capabilities to efficiently manage data streams.
Spring WebFlux为 Web 应用程序提供了反应式编程。反应式设计的异步和非阻塞性质提高了性能和内存使用。Project Reactor提供了这些功能以有效地管理数据流。
However, backpressure is a common problem in these kinds of applications. In this tutorial, we’ll explain what it is and how to apply a backpressure mechanism in Spring WebFlux to mitigate it.
然而,背压是这类应用中常见的问题。在本教程中,我们将解释什么是反压,以及如何在Spring WebFlux中应用反压机制来缓解它。
2. Backpressure in Reactive Streams
2.反应性水流中的背压
Due to the non-blocking nature of Reactive Programming, the server doesn’t send the complete stream at once. It can push the data concurrently as soon as it is available. Thus, the client waits less time to receive and process the events. But, there are issues to overcome.
由于反应式编程的非阻塞性,服务器不会一次性发送完整的流。它可以在数据可用时立即并发地推送数据。因此,客户端等待接收和处理事件的时间较短。但是,也有一些问题需要克服。
Backpressure in software systems is the capability to overload the traffic communication. In other words, emitters of information overwhelm consumers with data they are not able to process.
软件系统中的背压是指流量通信过载的能力。换句话说,信息的发射者用他们无法处理的数据压倒了消费者。
Eventually, people also apply this term as the mechanism to control and handle it. It is the protective actions taken by systems to control downstream forces.
最终,人们也将这个词作为控制和处理的机制来应用。它是系统为控制下游力量而采取的保护行动。
2.1. What Is Backpressure?
2.1.什么是背压?
In Reactive Streams, backpressure also defines how to regulate the transmission of stream elements. In other words, control how many elements the recipient can consume.
在反应流中,背压也定义了如何调节流元素的传输。换句话说,控制接收者可以消费多少元素。
Let’s use an example to clearly describe what it is:
让我们用一个例子来清楚地描述它是什么。
- The system contains three services: the Publisher, the Consumer, and the Graphical User Interface (GUI)
- The Publisher sends 10000 events per second to the Consumer
- The Consumer processes them and sends the result to the GUI
- The GUI displays the results to the users
- The Consumer can only handle 7500 events per second
At this speed rate, the consumer cannot manage the events (backpressure). Consequently, the system would collapse and the users would not see the results.
在这种速度下,消费者无法管理这些事件(背压)。因此,系统将崩溃,而用户将看不到结果。
2.2. Using Backpressure to Prevent Systemic Failures
2.2.利用背压来防止系统性故障的发生
The recommendation here would be to apply some sort of backpressure strategy to prevent systemic failures. The objective is to efficiently manage the extra events received:
这里的建议是应用某种背压策略来防止系统性故障。其目的是有效地管理收到的额外事件。
- Controlling the data stream sent would be the first option. Basically, the publisher needs to slow down the pace of the events. Therefore, the consumer is not overloaded. Unfortunately, this is not always possible and we would need to find other available options
- Buffering the extra amount of data is the second choice. With this approach, the consumer stores temporarily the remaining events until it can process them. The main drawback here is to unbind the buffer causing memory crashing
- Dropping the extra events losing track of them. Even this solution is far from ideal, with this technique the system would not collapse
2.3. Controlling Backpressure
2.3.控制背压
We’ll focus on controlling the events emitted by the publisher. Basically, there are three strategies to follow:
我们将专注于控制由发布者发出的事件。基本上,有三种策略可以遵循。
- Send new events only when the subscriber requests them. This is a pull strategy to gather elements at the emitter request
- Limiting the number of events to receive at the client-side. Working as a limited push strategy the publisher only can send a maximum amount of items to the client at once
- Canceling the data streaming when the consumer cannot process more events. In this case, the receiver can abort the transmission at any given time and subscribe to the stream later again
3. Handling Backpressure in Spring WebFlux
3.在Spring WebFlux中处理背压
Spring WebFlux provides an asynchronous non-blocking flow of reactive streams. The responsible for backpressure within Spring WebFlux is the Project Reactor. It internally uses Flux functionalities to apply the mechanisms to control the events produced by the emitter.
Spring WebFlux提供了反应式流的异步非阻塞流动。在 Spring WebFlux 中负责背压的是项目反应器。它在内部使用Flux 功能来应用机制,以控制由发射器产生的事件。
WebFlux uses TCP flow control to regulate the backpressure in bytes. But it does not handle the logical elements the consumer can receive. Let’s see the interaction flow happening under the hood:
WebFlux使用TCP流量控制来调节字节数的背压。但它并不处理消费者可以接收的逻辑元素。让我们看看引擎盖下发生的交互流。
- WebFlux framework is responsible for the conversion of events to bytes in order to transfer/receive them through TCP
- It may happen that the consumer starts and long-running job before requesting the next logical element
- While the receiver is processing the events, WebFlux enqueue bytes without acknowledgment because there is no demand for new events
- Due to the nature of the TCP protocol, if there are new events the publisher will continue sending them to the network
In conclusion, the diagram above shows that the demand in logical elements could be different for the consumer and the publisher. Spring WebFlux does not ideally manage backpressure between the services interacting as a whole system. It handles it with the consumer independently and then with the publisher in the same way. But it is not taking into account the logical demand between the two services.
总之,上图显示,消费者和发布者在逻辑元素方面的需求可能是不同的。Spring WebFlux并没有理想地管理作为一个整体系统交互的服务之间的背压。它对消费者进行独立处理,然后以同样的方式对发布者进行处理。但它没有考虑到两个服务之间的逻辑需求。
So, Spring WebFlux does not handle backpressure as we can expect. Let’s see in the next section how to implement a backpressure mechanism in Spring WebFlux!
因此,Spring WebFlux并没有像我们期望的那样处理背压。让我们在下一节中看看如何在Spring WebFlux中实现背压机制吧!
4. Implementing Backpressure Mechanism with Spring WebFlux
4.用Spring WebFlux实现背压机制
We’ll use the Flux implementation to handle the control of the events received. Therefore, we’ll expose the request and response body with backpressure support on the read and the write side. Then, the producer would slow down or stop until the consumer’s capacity frees up. Let’s see how to do it!
我们将使用Flux实现来处理对所接收事件的控制。因此,我们将在读取和写入方面暴露出支持背压的请求和响应体。然后,生产者将放慢速度或停止,直到消费者的容量释放出来。让我们看看如何做到这一点!
4.1. Dependencies
4.1. 依赖性
To implement the examples, we’ll simply add the Spring WebFlux starter and Reactor test dependencies to our pom.xml:
为了实现这些示例,我们只需将Spring WebFlux启动器和Reactor测试依赖项添加到我们的pom.xml。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
4.2. Request
4.2. 要求
The first option is to give the consumer control over the events it can process. Thus, the publisher waits until the receiver requests new events. In summary, the client subscribes to the Flux and then process the events based on its demand:
第一个方案是让消费者控制它可以处理的事件。因此,发布者会等待,直到接收者请求新的事件。总之,客户端订阅了Flux,然后根据其需求处理事件。
@Test
public void whenRequestingChunks10_thenMessagesAreReceived() {
Flux request = Flux.range(1, 50);
request.subscribe(
System.out::println,
err -> err.printStackTrace(),
() -> System.out.println("All 50 items have been successfully processed!!!"),
subscription -> {
for (int i = 0; i < 5; i++) {
System.out.println("Requesting the next 10 elements!!!");
subscription.request(10);
}
}
);
StepVerifier.create(request)
.expectSubscription()
.thenRequest(10)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.thenRequest(10)
.expectNext(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
.thenRequest(10)
.expectNext(21, 22, 23, 24, 25, 26, 27 , 28, 29 ,30)
.thenRequest(10)
.expectNext(31, 32, 33, 34, 35, 36, 37 , 38, 39 ,40)
.thenRequest(10)
.expectNext(41, 42, 43, 44, 45, 46, 47 , 48, 49 ,50)
.verifyComplete();
With this approach, the emitter never overwhelms the receiver. In other words, the client is under control to process the events it needs.
通过这种方法,发射器永远不会压倒接收器。换句话说,客户端是在控制下处理它所需要的事件。
We’ll test the producer behavior with respect to backpressure with StepVerifier. We’ll expect the next n items only when the thenRequest(n) is called.
我们将用StepVerifier测试生产者在背压方面的行为。我们将只在thenRequest(n)被调用时期待下一个n项。
4.3. Limit
4.3. 限制
The second option is to use the limitRange() operator from Project Reactor. It allows setting the number of items to prefetch at once. One interesting feature is that the limit applies even when the subscriber requests more events to process. The emitter splits the events into chunks avoiding consuming more than the limit on each request:
第二个选择是使用Project Reactor的limitRange()操作符。它允许设置一次预取的项目数量。一个有趣的特点是,即使订阅者要求处理更多的事件,该限制也适用。发射器会将事件分成几块,避免每次请求的消耗量超过限制。
@Test
public void whenLimitRateSet_thenSplitIntoChunks() throws InterruptedException {
Flux<Integer> limit = Flux.range(1, 25);
limit.limitRate(10);
limit.subscribe(
value -> System.out.println(value),
err -> err.printStackTrace(),
() -> System.out.println("Finished!!"),
subscription -> subscription.request(15)
);
StepVerifier.create(limit)
.expectSubscription()
.thenRequest(15)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.expectNext(11, 12, 13, 14, 15)
.thenRequest(10)
.expectNext(16, 17, 18, 19, 20, 21, 22, 23, 24, 25)
.verifyComplete();
}
4.4. Cancel
4.4.取消
Finally, the consumer can cancel the events to receive at any moment. For this example, we’ll use another approach. Project Reactor allows implementing our own Subscriber or extend the BaseSubscriber. So let’s see how the receiver can abort the reception of new events at any time overriding the mentioned class:
最后,消费者可以在任何时候取消要接收的事件。对于这个例子,我们将使用另一种方法。Project Reactor允许实现我们自己的Subscriber或者扩展BaseSubscriber。因此,让我们看看接收者如何在任何时候重写所述的类来中止接收新事件。
@Test
public void whenCancel_thenSubscriptionFinished() {
Flux<Integer> cancel = Flux.range(1, 10).log();
cancel.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnNext(Integer value) {
request(3);
System.out.println(value);
cancel();
}
});
StepVerifier.create(cancel)
.expectNext(1, 2, 3)
.thenCancel()
.verify();
}
5. Conclusion
5.总结
In this tutorial, we showed what backpressure is in Reactive Programming and how to avoid it. Spring WebFlux supports backpressure through Project Reactor. Therefore, it can provide availability, robustness, and stability when the publisher overwhelms the consumer with too many events. In summary, it can prevent systemic failures due to high demand.
在本教程中,我们展示了什么是反应式编程中的背压以及如何避免它。Spring WebFlux通过Project Reactor支持背压。因此,当发布者用过多的事件压倒消费者时,它可以提供可用性、健壮性和稳定性。总之,它可以防止因高需求而导致的系统性故障。
As always, the code is available over on GitHub.
像往常一样,代码可在GitHub上获得。