1. Introduction
1.绪论
In this tutorial, we’ll see different ways of limiting the number of requests per second with Spring 5 WebClient.
在本教程中,我们将看到用Spring 5 WebClient限制每秒请求数的不同方法。
While we usually want to take advantage of its non-blocking nature, some scenarios might force us to add delays. We’ll learn about some of these scenarios while using a few Project Reactor features to control a stream of requests to a server.
虽然我们通常希望利用其非阻塞的特性,但有些场景可能会迫使我们增加延迟。我们将在使用一些Project Reactor功能来控制流向服务器的请求时,了解其中的一些情况。
2. Initial Setup
2.初始设置
A typical case where we’d need to limit our requests per second is to avoid overwhelming the server. Also, some web services have a maximum number of requests allowed per hour. Likewise, some control the number of concurrent requests per client.
我们需要限制每秒请求数的典型情况是避免服务器不堪重负。此外,一些网络服务有一个每小时允许的最大请求数。同样地,有些服务还控制每个客户的并发请求数。
2.1. Writing a Simple Web Service
2.1.编写一个简单的网络服务
To explore this scenario, we’ll start with a simple @RestController that serves random numbers from a fixed range:
为了探索这种情况,我们将从一个简单的@RestController开始,从一个固定范围内提供随机数。
@RestController
@RequestMapping("/random")
public class RandomController {
@GetMapping
Integer getRandom() {
return new Random().nextInt(50));
}
}
Next, we’ll simulate an expensive operation and limit the number of concurrent requests.
接下来,我们将模拟一个昂贵的操作,并限制并发请求的数量。。
2.2. Rate Limiting Our Server
2.2.速率限制我们的服务器
Before seeing solutions, let’s change our service to simulate a more realistic scenario.
在看到解决方案之前,让我们改变我们的服务来模拟一个更现实的场景。
Firstly, we’ll limit the number of concurrent requests our server can take, throwing an exception when the limit is reached.
首先,我们将限制我们的服务器所能接受的并发请求的数量,当达到限制时抛出一个异常。
Secondly, we’ll add a delay to process our response, simulating an expensive operation. While there are more robust solutions available, we’ll do this just for illustration purposes:
其次,我们将添加一个延迟来处理我们的响应,模拟一个昂贵的操作。虽然有更强大的解决方案可用,我们将这样做只是为了说明情况。
public class Concurrency {
public static final int MAX_CONCURRENT = 5;
static final AtomicInteger CONCURRENT_REQUESTS = new HashMap<>();
public static int protect(IntSupplier supplier) {
try {
if (CONCURRENT_REQUESTS.incrementAndGet() > MAX_CONCURRENT) {
throw new UnsupportedOperationException("max concurrent requests reached");
}
TimeUnit.SECONDS.sleep(2);
return supplier.getAsInt();
} finally {
CONCURRENT_REQUESTS.decrementAndGet();
}
}
}
Finally, let’s change our endpoint to use it:
最后,让我们改变我们的端点来使用它。
@GetMapping
Integer getRandom() {
return Concurrency.protect(() -> new Random().nextInt(50));
}
Now, our endpoint refuses to process requests when we’re over MAX_CONCURRENT requests, returning an error to the client.
现在,当我们超过MAX_CONCURRENT请求时,我们的端点拒绝处理请求,向客户端返回一个错误。
2.3. Writing a Simple Client
2.3.编写一个简单的客户端
All examples will follow this pattern to generate a Flux of n requests and make a GET request to our service:
所有的例子都将遵循这个模式,生成一个n请求的Flux,并向我们的服务提出GET请求。
Flux.range(1, n)
.flatMap(i -> {
// GET request
});
To reduce the boilerplate, let’s implement the request part in a method we can reuse in all examples. We’ll receive a WebClient, call get(), and retrieve() the response body with generics using ParameterizedTypeReference:
为了减少模板,让我们在一个可以在所有示例中重复使用的方法中实现请求部分。我们将接收一个WebClient,调用get(),然后retrieve()响应体,使用generics::ParameterizedTypeReference。
public interface RandomConsumer {
static <T> Mono<T> get(WebClient client) {
return client.get()
.retrieve()
.bodyToMono(new ParameterizedTypeReference<T>() {});
}
}
Now we’re ready to see some approaches.
现在我们准备看看一些方法。
3. Delaying With zipWith(Flux.Interval())
3.用zipWith(Flux.Interval())进行延迟
Our first example combines our requests with a fixed delay using zipWith():
我们的第一个例子使用zipWith()将我们的请求与一个固定的延迟相结合。
public class ZipWithInterval {
public static Flux<Integer> fetch(
WebClient client, int requests, int delay) {
return Flux.range(1, requests)
.zipWith(Flux.interval(Duration.ofMillis(delay)))
.flatMap(i -> RandomConsumer.get(client));
}
}
As a result, this delays each request by delay milliseconds. We should note that this delay applies before sending the request.
因此,这使每个请求延迟了delay毫秒。我们应该注意,这个延迟是在发送请求之前适用的。
4. Delaying With Flux.delayElements()
4.用Flux.delayElements()进行延迟
Flux has a more straightforward way to delay its elements:
Flux有一个更直接的方法来延迟其元素。
public class DelayElements {
public static Flux<Integer> fetch(
WebClient client, int requests, int delay) {
return Flux.range(1, requests)
.delayElements(Duration.ofMillis(delay))
.flatMap(i -> RandomConsumer.get(client));
}
}
With delayElements(), the delay applies directly to Subscriber.onNext() signals. In other words, it delays each element from Flux.range(). Therefore, the function passed into flatMap() will be affected, taking longer to start. For instance, if the delay value is 1000, there will be a one second delay before our request starts.
使用delayElements(),延迟直接应用于Subscriber.onNext()信号。换句话说,它延迟了来自Flux.range()的每个元素。因此,传入flatMap()的函数将受到影响,需要更长的时间来启动。例如,如果delay值为1000,那么在我们的请求开始前将会有一秒钟的延迟。
4.1. Adapting Our Solution
4.1.改造我们的解决方案
Consequently, if we don’t provide a long enough delay, we’ll get an error:
因此,如果我们没有提供足够长的延迟,我们会得到一个错误。
@Test
void givenSmallDelay_whenDelayElements_thenExceptionThrown() {
int delay = 100;
int requests = 10;
assertThrows(InternalServerError.class, () -> {
DelayElements.fetch(client, requests, delay)
.blockLast();
});
}
That’s because we’re waiting 100 milliseconds per request, but each request takes two seconds to complete on the server side. So, rapidly, our concurrent requests limit is reached, and we get a 500 error.
这是因为我们正在等待每个请求100毫秒,但每个请求在服务器端需要两秒才能完成。因此,很快就达到了我们的并发请求限制,我们得到一个500错误。
We can get away with the request limit if we add enough delay. But then, we’d have another problem – we’d wait for more time than necessary.
如果我们增加足够的延迟,我们可以不受请求限制。但那样的话,我们就会有另一个问题–我们的等待时间会超过必要的时间。。
Depending on our use case, waiting too much might significantly impact performance. So, next, let’s check a more appropriate way to handle this since we know the limitations of our server.
根据我们的用例,过多的等待可能会严重影响性能。因此,接下来,让我们检查一下更合适的处理方式,因为我们知道我们服务器的限制。
5. Concurrency Control With flatMap()
5.用flatMap()进行并发控制
Given the limitations of our service, our best option is to send at most Concurrency.MAX_CONCURRENT requests in parallel. To do this, we can add one more argument to flatMap() for the maximum number of parallel processing:
考虑到我们服务的限制,我们最好的选择是最多并行发送Concurrency.MAX_CONCURRENT请求。为了做到这一点,我们可以给flatMap()增加一个参数,以获得最大的并行处理数量:。
public class LimitConcurrency {
public static Flux<Integer> fetch(
WebClient client, int requests, int concurrency) {
return Flux.range(1, requests)
.flatMap(i -> RandomConsumer.get(client), concurrency);
}
}
This parameter guarantees the maximum number of concurrent requests doesn’t exceed concurrency and that our processing won’t be delayed more than necessary:
该参数保证最大并发请求数不超过并发数,并且我们的处理不会被推迟到超过必要的时间:。
@Test
void givenLimitInsideServerRange_whenLimitedConcurrency_thenNoExceptionThrown() {
int limit = Concurrency.MAX_CONCURRENT;
int requests = 10;
assertDoesNotThrow(() -> {
LimitConcurrency.fetch(client, TOTAL_REQUESTS, limit)
.blockLast();
});
}
Still, a few other options are worth discussing, depending on our scenario and preference. Let’s go over some of them.
不过,其他一些选择还是值得讨论的,这取决于我们的情景和偏好。让我们来看看其中一些。
6. Using Resilience4j RateLimiter
6.使用Resilience4j的RateLimiter
Resilience4j is a versatile library designed for dealing with fault tolerance in applications. We’ll use it to limit the number of concurrent requests within an interval and include a timeout.
Resilience4j是一个多功能的库,旨在处理应用程序中的容错问题。我们将使用它来限制一个区间内的并发请求的数量,并包含一个超时。
Let’s start by adding the resilience4j-reactor and resilience4j-ratelimiter dependencies:
让我们首先添加resilience4j-reactor和resilience4j-ratelimiter依赖项。
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-reactor</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
<version>1.7.1</version>
</dependency>
Then we build our rate limiter with RateLimiter.of(), providing a name, an interval for sending new requests, a concurrency limit, and a timeout:
然后我们用RateLimiter.of()建立我们的速率限制器,提供一个名称、一个发送新请求的间隔、一个并发限制和一个超时。
public class Resilience4jRateLimit {
public static Flux<Integer> fetch(
WebClient client, int requests, int concurrency, int interval) {
RateLimiter limiter = RateLimiter.of("my-rate-limiter", RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofMillis(interval))
.limitForPeriod(concurrency)
.timeoutDuration(Duration.ofMillis(interval * concurrency))
.build());
// ...
}
}
Now we include it in our Flux with transformDeferred(), so it controls our GET requests rate:
现在我们用transformDeferred()将其包含在我们的Flux中,所以它控制我们的GET请求率:。
return Flux.range(1, requests)
.flatMap(i -> RandomConsumer.get(client)
.transformDeferred(RateLimiterOperator.of(limiter))
);
We should notice we can still have a problem if we define our interval as too low. But, this approach is helpful if we need to share a rate limiter specification with other operations.
我们应该注意到,如果我们定义的间隔太低,还是会有问题。但是,如果我们需要与其他操作共享一个速率限制器规范,这种方法是有帮助的。
7. Precise Throttling With Guava
7.用番石榴精确节流
Guava has a general-purpose rate limiter that works well for our scenario. Furthermore, since it uses the token bucket algorithm, it’ll only block when necessary instead of every time, unlike Flux.delayElements().
Guava有一个通用的速率限制器,可以很好地满足我们的场景需求。此外,由于它使用了令牌桶算法,它只会在必要的时候阻塞,而不是每次都阻塞,不像Flux.delayElements().那样。
First, we need to add guava to our pom.xml:
首先,我们需要将guava添加到我们的pom.xml。
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
To use it, we call RateLimiter.create() and pass it the maximum number of requests per second we want to send. Then, we call acquire() on the limiter before sending our request to throttle execution when necessary:
要使用它,我们调用RateLimiter.create()并将我们想要发送的每秒最大请求数传递给它。然后,我们在发送请求之前,在limiter上调用acquire(),以便在必要时节制执行。
public class GuavaRateLimit {
public static Flux<Integer> fetch(
WebClient client, int requests, int requestsPerSecond) {
RateLimiter limiter = RateLimiter.create(requestsPerSecond);
return Flux.range(1, requests)
.flatMap(i -> {
limiter.acquire();
return RandomConsumer.get(client);
});
}
}
This solution works excellently due to its simplicity – it doesn’t make our code block longer than necessary. For instance, if, for some reason, a request takes longer than expected, the next won’t wait to execute. But, this is the case only if we’re inside the range set for requestsPerSecond.
这一解决方案由于其简单性而运作良好–它不会使我们的代码块超过必要的时间。例如,如果由于某种原因,一个请求的时间比预期的要长,那么下一个请求就不会等待执行。但是,只有当我们在为requestsPerSecond设置的范围内时才会出现这种情况。
8. Conclusion
8.结语
In this article, we saw a few available approaches to rate limit our WebClient. After that, we simulated a controlled web service to see how it affected our code and tests. Moreover, we used Project Reactor and a few libraries to help us achieve the same goal differently.
在这篇文章中,我们看到了一些可用的方法来限制我们的WebClient的速率。之后,我们模拟了一个受控的Web服务,看看它是如何影响我们的代码和测试的。此外,我们使用Project Reactor和一些库来帮助我们以不同的方式实现相同的目标。
And as always, the source code is available over on GitHub.
一如既往,源代码可在GitHub上获得。