1. Overview
1.概述
Acknowledgment of messages is a standard mechanism within messaging systems, signaling to the message broker that the message has been received and should not be delivered again. In Amazon’s SQS (Simple Queue Service), acknowledgment is executed through the deletion of messages in the queue.
消息确认是消息传递系统中的一种标准机制,它向消息代理发出信号,表明消息已被接收,并且不应再次传递。在 Amazon 的 SQS(简单队列服务)中,确认是通过删除队列中的消息来执行的。
In this tutorial, we’ll explore the three acknowledgment modes Spring Cloud AWS SQS v3 provides out-of-the-box: ON_SUCCESS, MANUAL, and ALWAYS.
在本教程中,我们将探讨 Spring Cloud AWS SQS v3 开箱即提供的三种确认模式:ON_SUCCESS、MANUAL 和 ALLWAYS。
We’ll use an event-driven scenario to illustrate our use cases, leveraging the environment and test setup from the Spring Cloud AWS SQS V3 introductory article.
我们将利用 Spring Cloud AWS SQS V3 介绍性文章中的环境和测试设置,使用事件驱动场景来说明我们的用例。
2. Dependencies
2.依赖关系
We’ll first import the Spring Cloud AWS Bill of Materials to ensure all dependencies in our pom.xml are compatible with each other:
首先,我们将导入 Spring Cloud AWS 物料清单,以确保 pom.xml 中的所有依赖项都相互兼容:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
We’ll also add the Core and SQS starter dependencies:
我们还将添加 Core 和 SQS starter 依赖项:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
Finally, we’ll add the dependencies required for our tests, namely LocalStack and TestContainers with JUnit 5, the awaitility library for verifying asynchronous message consumption, and AssertJ to handle the assertions:
最后,我们将添加测试所需的依赖项,即 LocalStack 和 TestContainers 与 JUnit 5、用于验证异步消息消耗的 awaitility 库,以及用于处理断言的 AssertJ :
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
3. Setting up the Local Test Environment
3.设置本地测试环境
First, we’ll configure a LocalStack environment with Testcontainers for local testing:
首先,我们将为本地测试配置一个带有 Testcontainers 的 LocalStack 环境:
@Testcontainers
public class BaseSqsLiveTest {
private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";
@Container
static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
.toString());
}
}
Although this setup makes testing easy and repeatable, note that the code in this tutorial can also be used to target AWS directly.
虽然这种设置使测试变得简单且可重复,但请注意,本教程中的代码也可直接用于 AWS。
4. Setting up the Queue Names
4.设置队列名称
events:
queues:
order-processing-retry-queue: order_processing_retry_queue
order-processing-async-queue: order_processing_async_queue
order-processing-no-retries-queue: order_processing_no_retries_queue
acknowledgment:
order-processing-no-retries-queue: ALWAYS
The acknowledgment property ALWAYS will also be used by one of our listeners.
我们的一个监听器也将使用确认属性 ALWAYS 。
Let’s also add a few productIds to the same file to use throughout our examples:
我们还将在同一文件中添加一些 productIds 以在整个示例中使用:
product:
id:
smartphone: 123e4567-e89b-12d3-a456-426614174000
wireless-headphones: 123e4567-e89b-12d3-a456-426614174001
laptop: 123e4567-e89b-12d3-a456-426614174002
tablet: 123e4567-e89b-12d3-a456-426614174004
@ConfigurationProperties(prefix = "events.queues")
public class EventsQueuesProperties {
private String orderProcessingRetryQueue;
private String orderProcessingAsyncQueue;
private String orderProcessingNoRetriesQueue;
// getters and setters
}
And another for the products:
另一个是产品:
@ConfigurationProperties("product.id")
public class ProductIdProperties {
private UUID smartphone;
private UUID wirelessHeadphones;
private UUID laptop;
// getters and setters
}
Lastly, we enable the configuration properties in a @Configuration class using @EnableConfigurationProperties:
最后,我们使用 @EnableConfigurationProperties 在 @Configuration 类中启用配置属性:
@EnableConfigurationProperties({ EventsQueuesProperties.class, ProductIdProperties.class})
@Configuration
public class OrderProcessingConfiguration {
}
5. Acknowledgement on Successful Processing
5.成功处理后的确认
5.1. Creating the Services
5.1.创建服务
@Service
public class OrderService {
Map<UUID, OrderStatus> ORDER_STATUS_STORAGE = new ConcurrentHashMap<>();
public void updateOrderStatus(UUID orderId, OrderStatus status) {
ORDER_STATUS_STORAGE.put(orderId, status);
}
public OrderStatus getOrderStatus(UUID orderId) {
return ORDER_STATUS_STORAGE.getOrDefault(orderId, OrderStatus.UNKNOWN);
}
}
Then, we’ll create the InventoryService. We’ll simulate storage using a Map, populating it using ProductIdProperties, which is autowired with values from our application.yaml file:
然后,我们将创建 InventoryService 。我们将使用 Map 来模拟存储,并使用 ProductIdProperties 来填充它,而 ProductIdProperties 是使用 application.yaml 文件中的值自动配线的:
@Service
public class InventoryService implements InitializingBean {
private ProductIdProperties productIdProperties;
private Map<UUID, Integer> inventory;
public InventoryService(ProductIdProperties productIdProperties) {
this.productIdProperties = productIdProperties;
}
@Override
public void afterPropertiesSet() {
this.inventory = new ConcurrentHashMap<>(Map.of(productIdProperties.getSmartphone(), 10,
productIdProperties.getWirelessHeadphones(), 15,
productIdProperties.getLaptop(), 5);
}
}
The InitializingBean interface provides afterPropertiesSet, which is a lifecycle method that Spring invokes after all dependencies for the bean have been resolved — in our case, the ProductIdProperties bean.
InitializingBean 接口提供了 afterPropertiesSet,这是 Spring 在解析了 Bean 的所有依赖关系后调用的生命周期方法,在我们的案例中就是 ProductIdProperties Bean。
Let’s add a checkInventory method, which verifies whether the inventory has the requested quantity of the product. If the product doesn’t exist, it’ll throw a ProductNotFoundException, and if the product exists but not in sufficient quantity, it’ll throw an OutOfStockException. In the second scenario, we’ll also simulate a random replenishment so that upon a few retries, the processing will eventually succeed:
让我们添加一个checkInventory方法,用于验证库存中是否有所需数量的产品。如果产品不存在,它将抛出 ProductNotFoundException 异常;如果产品存在但数量不足,它将抛出 OutOfStockException 异常。在第二种情况下,我们还将模拟随机补货,这样经过几次重试后,处理最终会成功:
public void checkInventory(UUID productId, int quantity) {
Integer stock = inventory.get(productId);
if (stock < quantity) {
inventory.put(productId, stock + (int) (Math.random() * 5));
throw new OutOfStockException(
"Product with id %s is out of stock. Quantity requested: %s ".formatted(productId, quantity));
};
inventory.put(productId, stock - quantity);
}
5.2. Creating the Listener
5.2.创建监听器
We’re all set to create our first listener. We’ll use the @Component annotation and inject the services through Spring’s constructor dependency injection mechanism:
我们已经准备好创建第一个监听器了。我们将使用 @Component 注解,并通过 Spring 的 构造依赖注入机制注入服务:
@Component
public class OrderProcessingListeners {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingListeners.class);
private InventoryService inventoryService;
private OrderService orderService;
public OrderProcessingListeners(InventoryService inventoryService, OrderService orderService) {
this.inventoryService = inventoryService;
this.orderService = orderService;
}
}
Next, let’s write the listener method:
接下来,让我们编写监听器方法:
@SqsListener(value = "${events.queues.order-processing-retry-queue}", id = "retry-order-processing-container", messageVisibilitySeconds = "1")
public void stockCheckRetry(OrderCreatedEvent orderCreatedEvent) {
logger.info("Message received: {}", orderCreatedEvent);
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED);
logger.info("Message processed successfully: {}", orderCreatedEvent);
}
The value property is the queue name autowired via the application.yaml. Since ON_SUCCESS is the default acknowledgment mode, we don’t need to specify it in the annotation.
value属性是通过application.yaml 自动连接的队列名称。由于 ON_SUCCESS 是默认确认模式,因此我们不需要在注解中指定它。
</em
5.3. Setting up the Test Class
5.3.设置测试类
To assert the logic is working as expected, let’s create a test class:
为了确保逻辑按预期运行,让我们创建一个测试类:
@SpringBootTest
class OrderProcessingApplicationLiveTest extends BaseSqsLiveTest {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingApplicationLiveTest.class);
@Autowired
private EventsQueuesProperties eventsQueuesProperties;
@Autowired
private ProductIdProperties productIdProperties;
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private OrderService orderService;
@Autowired
private MessageListenerContainerRegistry registry;
}
We’ll also add a method named assertQueueIsEmpty. In it we’ll use the autowired MessageListenerContainerRegistry to fetch the container, then stop the container to make sure it’s not consuming any messages. The registry contains all containers created by the @SqsListener annotation:
我们还将添加一个名为 assertQueueIsEmpty 的方法。在该方法中,我们将使用自动连接的 MessageListenerContainerRegistry 来获取容器,然后停止容器,以确保它不再消耗任何消息。注册表包含由 @SqsListener 注解创建的所有容器:
private void assertQueueIsEmpty(String queueName, String containerId) {
logger.info("Stopping container {}", containerId);
var container = Objects
.requireNonNull(registry.getContainerById(containerId), () -> "could not find container " + containerId);
container.stop();
// ...
}
After the container stops, we’ll use the SqsTemplate to look for messages in the queue. If acknowledgment has succeeded, no messages should be returned. We’ll also set pollTimeout to a value greater than the visibility timeout so that, if the message hasn’t been deleted, it’ll be delivered again in the specified interval.
容器停止后,我们将使用 SqsTemplate 在队列中查找消息。如果确认成功,则不应返回任何消息。我们还将把 pollTimeout 设置为大于可见性超时的值,这样,如果消息尚未被删除,它将在指定的时间间隔内再次传递。
Here’s the continuation of the assertQueueIsEmpty method:
下面是 assertQueueIsEmpty 方法的继续:
// ...
logger.info("Checking for messages in queue {}", queueName);
var message = sqsTemplate.receive(from -> from.queue(queueName)
.pollTimeout(Duration.ofSeconds(5)));
assertThat(message).isEmpty();
logger.info("No messages found in queue {}", queueName);
5.4. Testing
5.4.测试
In this first test, we’ll send an OrderCreatedEvent to the queue, containing an Order for a product with a quantity greater than what’s in our inventory. When the exception goes through the listener method, it’ll signal to the framework that message processing failed and the message should be delivered again after the message visibility time window has elapsed.
在第一个测试中,我们将向队列发送一个OrderCreatedEvent,其中包含一个Order,该产品的数量大于我们的库存量。当异常通过监听器方法时,它会向框架发出消息处理失败的信号,消息应在 message visibility(消息可见性)时间窗口结束后再次交付。
To speed up testing, we set messageVisibilitySeconds to 1 in the annotation, but typically, this configuration is done in the queue itself and defaults to 30 seconds.
为加快测试速度,我们在注释中将 messageVisibilitySeconds 设置为 1,但通常这一配置在队列中完成,默认为 30 秒。
We’ll create the event and send it using the auto-configured SqsTemplate that Spring Cloud AWS provides. Then, we’ll use Awaitility to wait for the order status to change to PROCESSED, and, lastly, we’ll assert that the queue is empty, meaning that the acknowledgment succeeded:
我们将使用 Spring Cloud AWS 提供的自动配置 SqsTemplate 创建事件并发送。然后,我们将使用 Awaitility 等待订单状态变为 PROCESSED,最后,我们将断言队列为空,这意味着确认成功:
@Test
public void givenOnSuccessAcknowledgementMode_whenProcessingThrows_shouldRetry() {
var orderId = UUID.randomUUID();
var queueName = eventsQueuesProperties.getOrderProcessingRetryQueue();
sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getLaptop(), 10));
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.until(() -> orderService.getOrderStatus(orderId)
.equals(OrderStatus.PROCESSED));
assertQueueIsEmpty(queueName, "retry-order-processing-container");
}
Notice we’re passing the containerId specified in the @SqsListener annotation to the assertQueueIsEmpty method.
请注意,我们正在将 @SqsListener 注解中指定的 containerId 传递给 assertQueueIsEmpty 方法。
Now we can run the test. First, we’ll ensure Docker is running, and then we’ll execute the test. After the container initialization logs, we should see our application’s log message:
现在我们可以运行测试了。首先,我们要确保 Docker 正在运行,然后执行测试。在容器初始化日志之后,我们应该能看到应用程序的日志信息:
Message received: OrderCreatedEvent[id=83f27bf2-1bd4-460a-9006-d784ec7eff47, productId=123e4567-e89b-12d3-a456-426614174002, quantity=10]
Then, should see one or more failures due to lack of stock:
然后,由于库存不足,应该会出现一次或多次故障:
Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174002 is out of stock. Quantity requested: 10
And, since we added the replenishing logic, we should eventually see that the message processing succeeds:
而且,由于我们添加了补充逻辑,我们最终应该看到消息处理成功了:
Message processed successfully: OrderCreatedEvent[id=83f27bf2-1bd4-460a-9006-d784ec7eff47, productId=123e4567-e89b-12d3-a456-426614174002, quantity=10]
Lastly, we’ll ensure acknowledgment has succeeded:
最后,我们将确保确认成功:
INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container retry-order-processing-container
INFO 2699 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container retry-order-processing-container stopped
INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_retry_queue
INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_retry_queue
Note that “connection refused” errors might be thrown after the test completes — that’s because the Docker container stops before the framework can stop polling for messages. We can safely ignore these errors.
请注意,测试完成后可能会抛出 “拒绝连接 “错误,这是因为 Docker 容器在框架停止轮询消息之前就已停止。我们可以放心地忽略这些错误。
6. Manual Acknowledgement
6.手动确认
The framework supports manual acknowledgment of messages, which is useful for scenarios where we need greater control over the acknowledgment process.
该框架支持手动确认消息,这对于我们需要更好地控制确认过程的场景非常有用。
6.1. Creating the Listener
6.1.创建监听器
To illustrate this, we’ll create an asynchronous scenario where the InventoryService has a slow connection and we want to release the listener thread before it completes:
为了说明这一点,我们将创建一个异步场景,在该场景中,InventoryService的连接速度较慢,我们希望在其完成之前释放监听器线程:
@SqsListener(value = "${events.queues.order-processing-async-queue}", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL, id = "async-order-processing-container", messageVisibilitySeconds = "3")
public void slowStockCheckAsynchronous(OrderCreatedEvent orderCreatedEvent, Acknowledgement acknowledgement) {
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
CompletableFuture.runAsync(() -> inventoryService.slowCheckInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity()))
.thenRun(() -> orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED))
.thenCompose(voidFuture -> acknowledgement.acknowledgeAsync())
.thenRun(() -> logger.info("Message for order {} acknowledged", orderCreatedEvent.id()));
logger.info("Releasing processing thread.");
}
In this logic, we use Java’s CompletableFuture to run the inventory check asynchronously. We added the Acknowledge object to the listener method and SqsListenerAcknowledgementMode.MANUAL to the annotation’s acknowledgementMode property. This property is a String and accepts property placeholders and SpEL. The Acknowledgement object is only available when we set AcknowledgementMode to MANUAL.
在此逻辑中,我们使用 Java 的 CompletableFuture 来异步运行库存检查。我们将 Acknowledge 对象添加到监听器方法中,并将 SqsListenerAcknowledgementMode.MANUAL 添加到注解的 acknowledgementMode 属性中。该属性是 字符串,可接受属性占位符和 SpEL。只有当我们将 AcknowledgementMode 设置为 MANUAL 时,Acknowledgement 对象才可用。
Note that, in this example, we leverage Spring Boot auto-configuration, which provides sensible defaults, and the @SqsListener annotation properties to change between acknowledgment modes. An alternative would be declaring a SqsMessageListenerContainerFactory bean, which allows setting up more complex configurations.
请注意,在本示例中,我们利用了 Spring Boot 自动配置(它提供了合理的默认值)和 @SqsListener 注解属性在确认模式之间进行更改。另一种方法是声明SqsMessageListenerContainerFactorybean,它允许设置更复杂的配置。
6.2. Simulating a Slow Connection
6.2.模拟慢速连接
Now, let’s add the slowCheckInventory method to the InventoryService class, simulating a slow connection using Thread.sleep:
现在,让我们在 InventoryService 类中添加 slowCheckInventory 方法,使用 Thread.sleep 模拟慢速连接:
public void slowCheckInventory(UUID productId, int quantity) {
simulateBusyConnection();
checkInventory(productId, quantity);
}
private void simulateBusyConnection() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
6.3. Testing
6.3.测试
Next, let’s write our test:
接下来,让我们编写测试:
@Test
public void givenManualAcknowledgementMode_whenManuallyAcknowledge_shouldAcknowledge() {
var orderId = UUID.randomUUID();
var queueName = eventsQueuesProperties.getOrderProcessingAsyncQueue();
sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getSmartphone(), 1));
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.until(() -> orderService.getOrderStatus(orderId)
.equals(OrderStatus.PROCESSED));
assertQueueIsEmpty(queueName, "async-order-processing-container");
}
This time, we’re requesting a quantity available in the inventory, so we should see no errors thrown.
这一次,我们请求的是库存中的可用数量,所以应该不会出现错误。
When running the test, we’ll see a log message indicating the message was received:
运行测试时,我们会看到一条日志信息,显示已收到信息:
INFO 2786 --- [ing-container-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Message received: OrderCreatedEvent[id=013740a3-0a45-478a-b085-fbd634fbe66d, productId=123e4567-e89b-12d3-a456-426614174000, quantity=1]
Then, we’ll see the thread release message:
然后,我们会看到线程发布信息:
INFO 2786 --- [ing-container-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Releasing processing thread.
This is because we’re processing and acknowledging the message asynchronously. After about two seconds, we should see the log that the message has been acknowledged:
这是因为我们正在异步处理和确认消息。大约两秒钟后,我们应该会看到消息已被确认的日志:
INFO 2786 --- [onPool-worker-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Message for order 013740a3-0a45-478a-b085-fbd634fbe66d acknowledged
Finally, we’ll see the logs for stopping the container and asserting that the queue is empty:
最后,我们将看到停止容器和断言队列为空的日志:
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container async-order-processing-container
INFO 2786 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container async-order-processing-container stopped
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_async_queue
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_async_queue
7. Acknowledgement on Both Success and Error
7.对成功和错误的确认
The last acknowledgment mode we’ll explore is ALWAYS, which causes the framework to acknowledge the message regardless of whether the listener method throws an error.
我们要探讨的最后一种确认模式是 ALLWAYS,该模式会使框架 无论监听器方法是否抛出错误,都会确认消息。
7.1. Creating the Listener
7.1.创建监听器
Let’s simulate a sales event during which our inventory is limited and we don’t want to reprocess any messages, regardless of any failures. We’ll set the acknowledgment mode to ALWAYS using the property we defined earlier in our application.yml:
让我们来模拟一个销售事件,在该事件中,我们的库存是有限的,而且我们不想重新处理任何消息,无论是否有任何失败。我们将使用之前在 application.yml 中定义的属性,将确认模式设置为 ALWAYS :
@SqsListener(value = "${events.queues.order-processing-no-retries-queue}", acknowledgementMode = ${events.acknowledgment.order-processing-no-retries-queue}, id = "no-retries-order-processing-container", messageVisibilitySeconds = "3")
public void stockCheckNoRetries(OrderCreatedEvent orderCreatedEvent) {
logger.info("Message received: {}", orderCreatedEvent);
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.RECEIVED);
inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());
logger.info("Message processed: {}", orderCreatedEvent);
}
In the test, we’ll create an order with a quantity greater than our stock:
在测试中,我们将创建一个数量大于库存的订单:
7.2. Testing
7.2.测试
@Test
public void givenAlwaysAcknowledgementMode_whenProcessThrows_shouldAcknowledge() {
var orderId = UUID.randomUUID();
var queueName = eventsQueuesProperties.getOrderProcessingNoRetriesQueue();
sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getWirelessHeadphones(), 20));
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.until(() -> orderService.getOrderStatus(orderId)
.equals(OrderStatus.RECEIVED));
assertQueueIsEmpty(queueName, "no-retries-order-processing-container");
}
Now, even when the OutOfStockException is thrown, the message is acknowledged and no retries are attempted for the message:
现在,即使抛出OutOfStockException,也会确认消息,并且不会尝试重试消息:
Message received: OrderCreatedEvent[id=7587f1a2-328f-4791-8559-ee8e85b25259, productId=123e4567-e89b-12d3-a456-426614174001, quantity=20]
Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174001 is out of stock. Quantity requested: 20
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container no-retries-order-processing-container
INFO 2835 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container no-retries-order-processing-container stopped
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_no_retries_queue
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_no_retries_queue
8. Conclusion
8.结论
In this article, we used an event-driven scenario to showcase the three acknowledgment modes provided by Spring Cloud AWS v3 SQS integration: ON_SUCCESS (the default), MANUAL, and ALWAYS.
在本文中,我们使用了一个事件驱动场景来展示 Spring Cloud AWS v3 SQS 集成提供的三种确认模式:ON_SUCCESS(默认)、MANUAL 和 ALLWAYS。
We leveraged the auto-configured settings and used the @SqsListener annotation properties to switch between modes. We also created live tests to assert the behavior using Testcontainers and LocalStack.
我们利用了自动配置设置,并使用 @SqsListener 注解属性在不同模式间切换。我们还创建了实时测试,使用 Testcontainers 和 LocalStack 来断言行为。