1. Overview
1.概述
Spring Cloud AWS is a project that aims to simplify interacting with AWS services. Simple Queue Service (SQS) is an AWS solution for sending and receiving asynchronous messages in a scalable way.
Spring Cloud AWS 是一个旨在简化与 AWS 服务交互的项目。简单队列服务 (SQS) 是一种以可扩展方式发送和接收异步消息的 AWS 解决方案。
In this tutorial, we’ll reintroduce the Spring Cloud AWS SQS integration, which has been completely rewritten for Spring Cloud AWS 3.0.
在本教程中,我们将重新介绍Spring Cloud AWS SQS 集成,该集成已针对 Spring Cloud AWS 3.0 全面重写。
The framework provides familiar Spring abstractions for handling SQS queues, such as SqsTemplate and the @SqsListener annotation.
该框架为处理 SQS 队列提供了熟悉的 Spring 抽象,例如 SqsTemplate 和 @SqsListener 注解。
We’ll walk through an event-driven scenario with examples for sending and receiving messages, and show strategies for setting up integration tests with Testcontainers, a tool for managing disposable docker containers, and LocalStack, which simulates an AWS-like environment locally for testing our logic.
我们将通过 发送和接收消息的示例来介绍一个事件驱动的场景,并使用 Testcontainers 和 LocalStack 展示 设置集成测试的策略,Testcontainers 是一个管理一次性 docker 容器的工具,LocalStack 可在本地模拟类似 AWS 的环境来测试我们的逻辑。
2. Dependencies
2.依赖关系
The Spring Cloud AWS Bill of Materials (BOM) ensures compatible versions between projects. It declares versions for many dependencies, including Spring Boot, and should be used instead of Spring Boot’s own BOM.
Spring Cloud AWS 物料清单 (BOM) 可确保项目之间的版本兼容。它声明了包括 Spring Boot 在内的许多依赖项的版本,应取代 Spring Boot 自身的 BOM。
Here’s how to import it in our pom.xml file:
下面介绍如何将其导入 pom.xml 文件:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.0.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
The main dependency we’ll need is SQS Starter, which contains all SQS-related classes for the project. The SQS integration has no dependency to Spring Boot and can be used standalone in any standard Java application:
我们需要的主要依赖关系是 SQS Starter,它包含项目中所有与 SQS 相关的类。SQS 集成不依赖于 Spring Boot,可在任何标准 Java 应用程序中独立使用:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
For Spring Boot applications such as the one we’re building in this tutorial, we should add the project’s Core Starter, as it allows us to leverage Spring Boot’s auto-configuration for SQS, and AWS configuration such as credentials and region:
对于 Spring Boot 应用程序(如本教程中构建的应用程序),我们应添加项目的 Core Starter,因为它允许我们利用 Spring Boot 的 SQS 自动配置以及 AWS 配置(如凭据和区域):
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
3. Setting up a Local Test Environment
3.设置本地测试环境
In this section, we’ll walk through setting up LocalStack environment with Testcontainers to test our code in our local environment. Note that the examples in this tutorial can also be executed targeting AWS directly.
在本节中,我们将使用 Testcontainers 设置 LocalStack 环境,以便在本地环境中测试代码。请注意,本教程中的示例也可以直接针对 AWS 执行。
3.1. Dependencies
3.1 依赖性
For running LocalStack and TestContainers with JUnit 5, we’ll need two additional dependencies:
要使用 JUnit 5 运行 LocalStack 和 TestContainers,我们需要两个额外的依赖项:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
Let’s also include the awaitility library to help us assert the asynchronous message consumption:
我们还可以使用awaitility 库来帮助我们断言异步消息的消耗:
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
3.2. Configuration
3.2.配置
We’ll now create a class with the logic for managing our containers, which can be inherited by our test suites. Let’s name it BaseSqsIntegrationTests. For each test suite that extends this class, Testcontainers will create and start a new container, which is essential for isolating each suite’s data from one another.
现在,我们将创建一个类,其中包含用于管理容器的 逻辑,测试套件可以继承该类。我们将其命名为 BaseSqsIntegrationTests。对于扩展该类的每个测试套件,Testcontainers 都将创建并启动一个新容器,这对于将每个套件的数据相互隔离是至关重要的。
The @SpringBootTest annotation is necessary for the Spring Context to be initialized, and the @Testcontainers annotation associates our Testcontainers annotations with JUnit’s runtime so that containers start when the test suite runs and stop after tests are complete:
@SpringBootTest 注解是初始化 Spring Context 所必需的,而 @Testcontainers 注解则将我们的 Testcontainers 注解与 JUnit 的运行时关联起来,以便容器在测试套件运行时启动,并在测试完成后停止:
@SpringBootTest
@Testcontainers
public class BaseSqsIntegrationTest {
// Our test configuration will be added here
}
Let’s now declare the LocalStackContainer. The @Container annotation is also necessary for the framework to automatically manage the container’s lifecycle:
现在让我们声明 LocalStackContainer 。@Container 注解也是框架自动管理容器生命周期所必需的:
private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";
@Container
static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
Finally, we’ll bind the properties the Spring Cloud AWS framework uses for auto-configuration with LocalStack. We’ll fetch the container port and hosts at runtime since Testcontainers will provide us with a random port, which is great for parallel testing. We can use the @DynamicPropertySource annotation for that:
最后,我们将把 Spring Cloud AWS 框架用于自动配置的属性与 LocalStack 绑定。我们将在运行时获取容器端口和主机,因为 Testcontainers 会为我们提供随机端口,这非常适合并行测试。为此,我们可以使用 @DynamicPropertySource 注解:
@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
.toString());
// ...other AWS services endpoints can be added here
}
This is all we need to implement a Spring Boot test with LocalStack, Testcontainers, and Spring Cloud AWS. We also need to make sure the Docker engine is running in our local environment before running the tests.
这就是我们使用 LocalStack、Testcontainers 和 Spring Cloud AWS 实施 Spring Boot 测试所需的全部内容。在运行测试之前,我们还需要确保 Docker 引擎在本地环境中运行。
4. Setting up the Queue Names
4.设置队列名称
We can set up the queue names by leveraging Spring Boot’s application.yml property mechanism.
我们可以通过 利用 Spring Boot 的 application.yml 属性机制来设置队列名称。
For this tutorial, we’ll create three queues:
在本教程中,我们将创建三个队列:
events:
queues:
user-created-by-name-queue: user_created_by_name_queue
user-created-record-queue: user_created_record_queue
user-created-event-type-queue: user_created_event_type_queue
Let’s create a POJO to represent these properties:
让我们创建一个 POJO 来表示这些属性:
@ConfigurationProperties(prefix = "events.queues")
public class EventQueuesProperties {
private String userCreatedByNameQueue;
private String userCreatedRecordQueue;
private String userCreatedEventTypeQueue;
// getters and setters
}
Finally, we need to use the @EnableConfigurationProperties annotation in a @Configuration annotated class, or the main Spring Application class, to let Spring Boot know we want to populate it with our application.yml properties:
最后,我们需要在@Configuration注解的类或 Spring Application 主类中使用 @EnableConfigurationProperties 注解,让 Spring Boot 知道我们要在其中填充 application.yml 属性:
@SpringBootApplication
@EnableConfigurationProperties(EventQueuesProperties.class)
public class SpringCloudAwsApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudAwsApplication.class, args);
}
}
Now, we’re ready to inject either the values themselves or the POJO when we need the queue names.
现在,我们已经准备好在需要队列名称时注入值本身或 POJO。
By default, Spring Cloud AWS SQS will create the queues if they’re not found, which helps us quickly set up dev environments. In production, the application should not have permission to create queues, so it will fail to start if a queue is not found. The framework can also be configured to explicitly fail if a queue is not found instead.
默认情况下,Spring Cloud AWS SQS 会在未找到队列的情况下创建队列,这有助于我们快速建立开发环境。在生产环境中,应用程序不应拥有创建队列的权限,因此如果找不到队列,它将无法启动。也可以将框架配置为在未找到队列时显式启动失败。
5. Sending and Receiving Messages
5.收发信息
There are multiple ways of sending and receiving messages to and from SQS using Spring Cloud AWS. Here, we’ll cover the most common ones, using the SqsTemplate for sending messages and the @SqsListener annotation for receiving them.
使用 Spring Cloud AWS 向 SQS 收发消息有多种方法。在此,我们将介绍最常见的几种方法,使用 SqsTemplate 发送消息,使用 @SqsListener 注解接收消息。
5.1. Scenario
5.1.设想方案
In our scenario, we’ll simulate an event-driven application that responds to UserCreatedEvent by saving relevant information in its local repository.
在我们的应用场景中,我们将模拟一个事件驱动型应用程序,该应用程序通过在本地存储库中保存相关信息来响应UserCreatedEvent。
Let’s create a User entity:
让我们创建一个 User 实体:
public record User(String id, String name, String email) {
}
And let’s create a simple in-memory UserRepository:
让我们创建一个简单的内存 UserRepository :
@Repository
public class UserRepository {
private final Map<String, User> persistedUsers = new ConcurrentHashMap<>();
public void save(User userToSave) {
persistedUsers.put(userToSave.id(), userToSave);
}
public Optional<User> findById(String userId) {
return Optional.ofNullable(persistedUsers.get(userId));
}
public Optional<User> findByName(String name) {
return persistedUsers.values().stream()
.filter(user -> user.name().equals(name))
.findFirst();
}
}
And finally, let’s create a UserCreatedEvent Java Record class:
最后,让我们创建一个 UserCreatedEvent Java 记录类:
public record UserCreatedEvent(String id, String username, String email) {
}
5.2. Setup
5.2.设置
To test our scenarios, we’ll create a SpringCloudAwsSQSLiveTest class that extends the BaseSqsIntegrationTest file we created earlier. We’ll autowire three dependencies: the SqsTemplate that’s auto-configured by the framework, the UserRepository so we can assert our message processing worked, and our EventQueuesProperties POJO with the queue names:
为了测试我们的应用场景,我们将创建一个 SpringCloudAwsSQSLiveTest 类,该类将扩展我们之前创建的 BaseSqsIntegrationTest 文件。我们将自动连接三个依赖项:框架自动配置的 SqsTemplate 、UserRepository(以便我们可以断言消息处理工作正常)以及包含队列名称的 EventQueuesProperties POJO:
public class SpringCloudAwsSQSLiveTest extends BaseSqsIntegrationTest {
private static final Logger logger = LoggerFactory.getLogger(SpringCloudAwsSQSLiveTest.class);
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private UserRepository userRepository;
@Autowired
private EventQueuesProperties eventQueuesProperties;
// ...
}
To contain our listeners, let’s create a UserEventListeners class and declare it as a Spring @Component:
为了包含我们的监听器,让我们创建一个 UserEventListeners 类,并将其声明为 Spring @Component :
@Component
public class UserEventListeners {
private static final Logger logger = LoggerFactory.getLogger(UserEventListeners.class);
public static final String EVENT_TYPE_CUSTOM_HEADER = "eventType";
private final UserRepository userRepository;
public UserEventListeners(UserRepository userRepository) {
this.userRepository = userRepository;
}
// Our listeners will be added here
}
5.3. String Payloads
5.3.字符串有效载荷
In this first example, we’ll send a message with a String payload, receive it in our listener, and persist it to our repository. We’ll then poll the repository to make sure our application persists the data correctly.
在第一个示例中,我们将发送包含 String 有效载荷的消息,在监听器中接收该消息,并将其持久化到存储库中。然后,我们将轮询存储库,以确保我们的应用程序能正确地持久化数据。
First, let’s create a test for sending the message in our test class:
首先,让我们在测试类中创建一个发送消息的测试:
@Test
void givenAStringPayload_whenSend_shouldReceive() {
// given
var userName = "Albert";
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedByNameQueue())
.payload(userName));
logger.info("Message sent with payload {}", userName);
// then
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findByName(userName)
.isPresent());
}
We should see a log similar to:
我们应该看到类似的日志:
INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload Albert
And then, note that the test fails because we don’t have the listener for this queue yet.
然后请注意,测试失败的原因是我们还没有该队列的监听器。
Let’s set up our listener to consume the message from this queue in our listener class and make the test pass:
让我们在监听器类中设置监听器从队列中接收消息,并使测试通过:
@SqsListener("${events.queues.user-created-by-name-queue}")
public void receiveStringMessage(String username) {
logger.info("Received message: {}", username);
userRepository.save(new User(UUID.randomUUID()
.toString(), username, null));
}
Now, when we run the test, we should see the result in the log:
现在,当我们运行测试时,应该能在日志中看到结果:
INFO [ntContainer#0-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message: Albert
And the test passes.
测试通过。
Note that we’re using Spring’s properties-resolving capabilities to fetch the queue name from the application.yml we created earlier.
请注意,我们正在使用 Spring 的属性解析功能,从我们之前创建的 application.yml 中获取队列名称。
5.4. POJO and Record Payloads
5.4.POJO 和记录有效载荷
Now that we’ve sent and received a String payload, let’s set up a scenario with a Java Record, the UserCreatedEvent we created earlier.
现在我们已经发送和接收了 String 有效负载,让我们用 Java 记录(即我们之前创建的 UserCreatedEvent )来设置一个场景。
First, let’s write our failing test:
首先,让我们编写失败测试:
@Test
void givenARecordPayload_whenSend_shouldReceive() {
// given
String userId = UUID.randomUUID()
.toString();
var payload = new UserCreatedEvent(userId, "John", "john@baeldung.com");
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedRecordQueue())
.payload(payload));
// then
logger.info("Message sent with payload: {}", payload);
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findById(userId)
.isPresent());
}
We should see a log similar to this before the test fails:
在测试失败前,我们应该能看到类似这样的日志:
INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload: UserCreatedEvent[id=67f52cf6-c750-4200-9a02-345bda0516f8, username=John, email=john@baeldung.com]
Now, let’s create the corresponding listener to make the test pass:
现在,让我们创建相应的监听器,使测试通过:
@SqsListener("${events.queues.user-created-record-queue}")
public void receiveRecordMessage(UserCreatedEvent event) {
logger.info("Received message: {}", event);
userRepository.save(new User(event.id(), event.username(), event.email()));
}
We’ll see the output noting the message was received, and the test passes:
我们将看到输出结果,说明信息已收到,测试通过:
INFO [ntContainer#1-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message: UserCreatedEvent[id=2d66df3d-2dbd-4aed-8fc0-ddd08416ed12, username=John, email=john@baeldung.com]
The framework will auto-configure any ObjectMapper bean available in the Spring Context to handle the serialization and deserialization of messages. We can configure our own ObjectMapper and customize serialization in several ways, but that’s beyond the scope of this tutorial.
框架将自动配置 Spring 上下文中可用的任何 ObjectMapper Bean,以处理消息的序列化和反序列化。我们可以配置自己的 ObjectMapper 并以多种方式自定义序列化,但这超出了本教程的范围。
5.5. Spring Message and Headers
5.5.Spring信息和标头
In this last scenario, we’ll send a Record with a custom header and receive the message as a Spring Message instance, as well as both the custom header we added and a standard SQS header in the method signature. The framework automatically converts all SQS message attributes to message headers, including any provided by the user.
在最后一种情况中,我们将发送带有自定义标头的 Record,并以 Spring Message 实例的形式接收消息,同时在方法签名中包含我们添加的自定义标头和标准 SQS 标头。框架会自动将所有 SQS 消息属性转换为消息标头,包括用户提供的任何标头。
Let’s create the failing test first:
让我们先创建一个失败的测试:
@Test
void givenCustomHeaders_whenSend_shouldReceive() {
// given
String userId = UUID.randomUUID()
.toString();
var payload = new UserCreatedEvent(userId, "John", "john@baeldung.com");
var headers = Map.<String, Object> of(EVENT_TYPE_CUSTOM_HEADER, "UserCreatedEvent");
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedEventTypeQueue())
.payload(payload)
.headers(headers));
// then
logger.info("Sent message with payload {} and custom headers: {}", payload, headers);
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findById(userId)
.isPresent());
}
The test should generate a log similar to this before failing:
测试失败前应生成与此类似的日志:
INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Sent message with payload UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, email=john@baeldung.com] and custom headers: {eventType=UserCreatedEvent}
Now, let’s add the corresponding listener to make the test pass:
现在,让我们添加相应的监听器,使测试通过:
@SqsListener("${events.queues.user-created-event-type-queue}")
public void customHeaderMessage(Message<UserCreatedEvent> message, @Header(EVENT_TYPE_CUSTOM_HEADER) String eventType,
@Header(SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) Long firstReceive) {
logger.info("Received message {} with event type {}. First received at approximately {}.", message, eventType, firstReceive);
UserCreatedEvent payload = message.getPayload();
userRepository.save(new User(payload.id(), payload.username(), payload.email()));
}
And we’ll see the output when we re-run our test, indicating success:
重新运行测试时,我们将看到输出结果,表明测试成功:
INFO [ntContainer#2-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message GenericMessage [payload=UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, email=john@baeldung.com], headers=...
In this example, we’re receiving a Message with the deserialized UserCreatedEvent record as the payload and two headers. To ensure consistency throughout the project, we should use the SqsHeader class constants for retrieving SQS standard headers.
在这个示例中,我们接收到一个 Message 消息,其中包含作为有效载荷的反序列化 UserCreatedEvent 记录和两个头。为确保整个项目的一致性,我们应使用 SqsHeader 类常量来检索 SQS 标准头。
6. Conclusion
6.结论
In this article, we used an event-driven scenario to go through different examples for sending and receiving messages with Spring Cloud AWS SQS 3.0.
在本文中,我们使用了一个事件驱动的场景,通过不同的示例来介绍使用 Spring Cloud AWS SQS 3.0 发送和接收消息的方法。
We set up a local environment with LocalStack and TestContainers and configured the framework to use the proper local configuration for our integration tests.
我们使用 LocalStack 和 TestContainers 建立了本地环境,并配置了框架,以便为集成测试使用适当的本地配置。
As usual, the complete code used in this tutorial is available over on GitHub.
与往常一样,本教程中使用的完整代码可在 GitHub 上获取。