1. Introduction
1.绪论
By default in Spring AMQP, a failed message is re-queued for another round of consumption. Consequently, an infinite consumption loop may occur, causing an unstable situation and a waste of resources.
在Spring AMQP中,默认情况下,失败的消息会被重新排队以进行另一轮的消费。因此,可能会出现一个无限的消费循环,造成不稳定的情况和资源的浪费。
While using a Dead Letter Queue is a standard way to deal with failed messages, we may want to retry the message consumption and return the system to a normal state.
虽然使用死信队列是处理失败消息的标准方式,但我们可能想重试消息消耗,并将系统恢复到正常状态。
In this tutorial, we’ll present two different ways of implementing a retry strategy named Exponential Backoff.
在本教程中,我们将介绍实现重试策略的两种不同方式,名为Exponential Backoff。
2. Prerequisites
2.前提条件
Throughout this tutorial, we’ll use RabbitMQ, a popular AMQP implementation. Consequently, we may refer to this Spring AMQP article for further instructions on how to configure and use RabbitMQ with Spring.
在本教程中,我们将使用RabbitMQ,一个流行的 AMQP 实现。因此,我们可以参考这篇Spring AMQP文章,以进一步了解如何在 Spring 中配置和使用 RabbitMQ。
For the sake of simplicity, we’ll also use a docker image for our RabbitMQ instance, though any RabbitMQ instance listening on port 5672 will do.
为了简单起见,我们还将为我们的 RabbitMQ 实例使用一个 docker 镜像,尽管 任何在端口 5672 上监听的 RabbitMQ 实例都可以。
Let’s start a RabbitMQ docker container:
让我们启动一个 RabbitMQ docker 容器。
docker run -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management
In order to implement our examples, we need to add a dependency on spring-boot-starter-amqp. The latest version is available on Maven Central:
为了实现我们的例子,我们需要添加对spring-boot-starter-amqp的依赖。最新版本可在Maven中心获得。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
</dependencies>
3. A Blocking Way
3.阻止的方式
Our first way will use Spring Retry fixtures. We’ll create a simple queue and a consumer configured to wait for some time between retries of the failed message.
我们的第一个方法将使用Spring Retry固定装置。我们将创建一个简单的队列和一个消费者,配置为在失败消息的重试之间等待一段时间。
First, let’s create our queue:
首先,让我们创建我们的队列。
@Bean
public Queue blockingQueue() {
return QueueBuilder.nonDurable("blocking-queue").build();
}
Secondly, let’s configure a backoff strategy in RetryOperationsInterceptor and wire it in a custom RabbitListenerContainerFactory:
其次,让我们在RetryOperationsInterceptor中配置一个回退策略,并在自定义的RabbitListenerContainerFactory中连接它。
@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.backOffOptions(1000, 3.0, 10000)
.maxAttempts(5)
.recoverer(observableRecoverer())
.build();
}
@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory(
ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
Advice[] adviceChain = { retryInterceptor };
factory.setAdviceChain(adviceChain);
return factory;
}
As shown above, we’re configuring an initial interval of 1000ms and a multiplier of 3.0, up to a maximum wait time of 10000ms. In addition, after five attempts the message will be dropped.
如上图所示,我们配置的初始间隔为1000ms,乘数为3.0,最大等待时间为10000ms。此外,在五次尝试后,该消息将被放弃。
Let’s add our consumer and force a failed message by throwing an exception:
让我们添加我们的消费者,并通过抛出一个异常来强制发送一个失败的消息。
@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory")
public void consumeBlocking(String payload) throws Exception {
logger.info("Processing message from blocking-queue: {}", payload);
throw new Exception("exception occured!");
}
Finally, let’s create a test and send two messages to our queue:
最后,让我们创建一个测试,向我们的队列发送两条消息。
@Test
public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception {
int nb = 2;
CountDownLatch latch = new CountDownLatch(nb);
observableRecoverer.setObserver(() -> latch.countDown());
for (int i = 1; i <= nb; i++) {
rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i);
}
latch.await();
}
Keep in mind that the CountdownLatch is only used as a test fixture.
请记住,CountdownLatch只是作为一个测试夹具使用。
Let’s run the test and check our log output:
让我们运行测试并检查我们的日志输出。
2020-02-18 21:17:55.638 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:56.641 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:59.644 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:08.654 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.657 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.875 ERROR : java.lang.Exception: exception occured!
2020-02-18 21:18:18.858 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:19.860 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:22.863 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:31.867 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.871 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.875 ERROR : java.lang.Exception: exception occured!
As can be seen, this log correctly shows the exponential wait time between each retry. While our backoff strategy works, our consumer is blocked until the retries have been exhausted. A trivial improvement is to make our consumer execute concurrently by setting the concurrency attribute of @RabbitListener:
可以看出,该日志正确显示了每次重试之间的指数级等待时间。虽然我们的回退策略奏效,但是我们的消费者被阻塞了,直到重试次数用尽。一个微不足道的改进是通过设置@RabbitListener的concurrency属性使我们的消费者并发地执行。
@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory", concurrency = "2")
However, a retried message still blocks a consumer instance. Therefore, the application can suffer from latency issues.
然而,重试的消息仍然会阻塞消费者实例。因此,应用程序可能会受到延迟问题的影响。
In the next section, we’ll present a non-blocking way to implement a similar strategy.
在下一节,我们将介绍一种非阻塞的方式来实现类似的策略。
4. A Non-blocking Way
4.无阻塞的方式
An alternative way involves a number of retry queues coupled with message expiration. As a matter of fact, when a message expires it ends up in a dead letter queue. In other words, if the DLQ consumer sends back the message to its original queue, we’re essentially doing a retry loop.
另一种方式涉及到一些重试队列,再加上消息过期。事实上,当一个消息过期时,它最终会进入一个死信队列。换句话说,如果DLQ消费者将消息送回它原来的队列,我们基本上是在做一个重试循环。
As a result, the number of retry queues used is the number of attempts that will occur.
因此,使用的重试队列的数量就是将发生的尝试次数。
First, let’s create the dead letter queue for our retry queues:
首先,让我们为我们的重试队列创建死信队列。
@Bean
public Queue retryWaitEndedQueue() {
return QueueBuilder.nonDurable("retry-wait-ended-queue").build();
}
Let’s add a consumer on the retry dead letter queue. This consumer’s sole responsibility is sending back the message to its original queue:
让我们在重试死信队列上添加一个消费者。这个消费者的唯一责任是把消息送回它原来的队列。
@RabbitListener(queues = "retry-wait-ended-queue", containerFactory = "defaultContainerFactory")
public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception{
MessageProperties props = message.getMessageProperties();
rabbitTemplate().convertAndSend(props.getHeader("x-original-exchange"),
props.getHeader("x-original-routing-key"), message);
}
Secondly, let’s create a wrapper object for our retry queues. This object will hold the exponential backoff configuration:
其次,让我们为我们的重试队列创建一个包装器对象。这个对象将保存指数退避的配置。
public class RetryQueues {
private Queue[] queues;
private long initialInterval;
private double factor;
private long maxWait;
// constructor, getters and setters
Thirdly, let’s define three retry queues:
第三,让我们定义三个重试队列。
@Bean
public Queue retryQueue1() {
return QueueBuilder.nonDurable("retry-queue-1")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
@Bean
public Queue retryQueue2() {
return QueueBuilder.nonDurable("retry-queue-2")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
@Bean
public Queue retryQueue3() {
return QueueBuilder.nonDurable("retry-queue-3")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
@Bean
public RetryQueues retryQueues() {
return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3());
}
Then, we need an interceptor to handle the message consumption:
然后,我们需要一个拦截器来处理消息的消耗。
public class RetryQueuesInterceptor implements MethodInterceptor {
// fields and constructor
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
return tryConsume(invocation, this::ack, (messageAndChannel, e) -> {
try {
int retryCount = tryGetRetryCountOrFail(messageAndChannel, e);
sendToNextRetryQueue(messageAndChannel, retryCount);
} catch (Throwable t) {
// ...
throw new RuntimeException(t);
}
});
}
In the case of the consumer returning successfully, we simply acknowledge the message.
在消费者成功返回的情况下,我们只是确认该信息。
However, if the consumer throws an exception and there are attempts left, we send the message to the next retry queue:
然而,如果消费者抛出一个异常,并且还有一些尝试,我们就将消息发送到下一个重试队列:。
private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception {
String retryQueueName = retryQueues.getQueueName(retryCount);
rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> {
MessageProperties props = m.getMessageProperties();
props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount)));
props.setHeader("x-retried-count", String.valueOf(retryCount + 1));
props.setHeader("x-original-exchange", props.getReceivedExchange());
props.setHeader("x-original-routing-key", props.getReceivedRoutingKey());
return m;
});
mac.channel.basicReject(mac.message.getMessageProperties()
.getDeliveryTag(), false);
}
Again, let’s wire our interceptor in a custom RabbitListenerContainerFactory:
同样,让我们在一个自定义的RabbitListenerContainerFactory中连接我们的拦截器。
@Bean
public SimpleRabbitListenerContainerFactory retryQueuesContainerFactory(
ConnectionFactory connectionFactory, RetryQueuesInterceptor retryInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
Advice[] adviceChain = { retryInterceptor };
factory.setAdviceChain(adviceChain);
return factory;
}
Finally, we define our main queue and a consumer which simulates a failed message:
最后,我们定义了我们的主队列和一个模拟失败消息的消费者。
@Bean
public Queue nonBlockingQueue() {
return QueueBuilder.nonDurable("non-blocking-queue")
.build();
}
@RabbitListener(queues = "non-blocking-queue", containerFactory = "retryQueuesContainerFactory",
ackMode = "MANUAL")
public void consumeNonBlocking(String payload) throws Exception {
logger.info("Processing message from non-blocking-queue: {}", payload);
throw new Exception("Error occured!");
}
Let’s create another test and send two messages:
让我们再创建一个测试,并发送两条信息。
@Test
public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception {
int nb = 2;
CountDownLatch latch = new CountDownLatch(nb);
retryQueues.setObserver(() -> latch.countDown());
for (int i = 1; i <= nb; i++) {
rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i);
}
latch.await();
}
Then, let’s launch our test and check the log:
然后,让我们启动我们的测试并检查日志。
2020-02-19 10:31:40.640 INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:40.656 INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:41.620 INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:41.623 INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:44.415 INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:44.420 INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.751 INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:52.774 ERROR : java.lang.Exception: Error occured!
2020-02-19 10:31:52.829 INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.841 ERROR : java.lang.Exception: Error occured!
Again, we see an exponential wait time between each retry. However, instead of blocking until every attempt is made, the messages are processed concurrently.
同样,我们看到每次重试之间的等待时间是指数级的。然而,不是阻塞直到每一次尝试,而是同时处理这些消息。
While this setup is quite flexible and helps alleviate latency issues, there is a common pitfall. Indeed, RabbitMQ removes an expired message only when it reaches the head of the queue. Therefore, if a message has a greater expiration period, it will block all other messages in the queue. For this reason, a reply queue must only contain messages having the same expiration value.
虽然这种设置相当灵活,并有助于缓解延迟问题,但也有一个常见的隐患。事实上,RabbitMQ只有在过期消息到达队列的头部时才会将其删除。因此,如果一个消息的过期时间较长,它将阻塞队列中的所有其他消息。由于这个原因,一个回复队列必须只包含具有相同过期值的消息。
4. Conclusion
4.总结
As shown above, event-based systems can implement an exponential backoff strategy to improve resiliency. While implementing such solutions can be trivial, it’s important to realize that a certain solution can be well adapted to a small system, but cause latency issues in high-throughput ecosystems.
如上所示,基于事件的系统可以实施指数退避策略来提高弹性。虽然实施这样的解决方案可能是微不足道的,但重要的是要认识到,某种解决方案可以很好地适应小系统,但在高吞吐量的生态系统中会造成延迟问题。
The source code is available over on GitHub.
源代码可在GitHub上获得,。