1. Overview
1.概述
In this tutorial, we’ll explore how to use Amazon’s SQS (Simple Queue Service) using the Java SDK.
在本教程中,我们将探讨如何使用Java SDK来使用Amazon的SQS(简单队列服务)。
2. Prerequisites
2.先决条件
The Maven dependencies, AWS account settings, and client connection needed to use the Amazon AWS SDK for SQS are the same as in this article here.
使用SQS的Amazon AWS SDK所需的Maven依赖项、AWS账户设置和客户端连接与本文中的内容相同。
Assuming we’ve created an instance of AWSCredentials, as described in the previous article, we can go ahead and create our SQS client:
假设我们已经创建了一个AWSCredentials的实例,如上一篇文章所述,我们可以继续创建我们的SQS客户端。
AmazonSQS sqs = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.withRegion(Regions.US_EAST_1)
.build();
3. Creating Queues
3.创建队列
Once we’ve set up our SQS client, creating queues is fairly straightforward.
一旦我们建立了我们的SQS客户端,创建队列是相当简单的。
3.1. Creating a Standard Queue
3.1.创建一个标准队列
Let’s see how we can create a Standard Queue. To do this, we’ll need to create an instance of CreateQueueRequest:
让我们看看我们如何创建一个标准队列。要做到这一点,我们需要创建一个CreateQueueRequest的实例:。
CreateQueueRequest createStandardQueueRequest = new CreateQueueRequest("baeldung-queue");
String standardQueueUrl = sqs.createQueue(createStandardQueueRequest).getQueueUrl();
3.2. Creating a FIFO Queue
3.2.创建一个FIFO队列
Creating a FIFO is similar to creating a Standard Queue. We’ll still use an instance of CreateQueueRequest, as we did previously. Only this time, we’ll have to pass in queue attributes, and set the FifoQueue attribute to true:
创建一个FIFO与创建一个标准队列类似。我们仍将使用CreateQueueRequest的实例,就像我们之前做的那样。只是这一次,我们必须传入队列属性,并将FifoQueue属性设置为true:。
Map<String, String> queueAttributes = new HashMap<>();
queueAttributes.put("FifoQueue", "true");
queueAttributes.put("ContentBasedDeduplication", "true");
CreateQueueRequest createFifoQueueRequest = new CreateQueueRequest(
"baeldung-queue.fifo").withAttributes(queueAttributes);
String fifoQueueUrl = sqs.createQueue(createFifoQueueRequest)
.getQueueUrl();
4. Posting Messages to Queues
4.向队列发布消息
Once we’ve got our queues set up, we can start sending messages.
一旦我们建立了我们的队列,我们就可以开始发送消息。
4.1. Posting a Message to a Standard Queue
4.1.将消息发布到标准队列
To send messages to a standard queue, we’ll have to create an instance of SendMessageRequest.
为了向标准队列发送消息,我们将必须创建一个SendMessageRequest的实例。
Then we attach a map of message attributes to this request:
然后我们给这个请求附加一个消息属性图。
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put("AttributeOne", new MessageAttributeValue()
.withStringValue("This is an attribute")
.withDataType("String"));
SendMessageRequest sendMessageStandardQueue = new SendMessageRequest()
.withQueueUrl(standardQueueUrl)
.withMessageBody("A simple message.")
.withDelaySeconds(30)
.withMessageAttributes(messageAttributes);
sqs.sendMessage(sendMessageStandardQueue);
The withDelaySeconds() specifies after how long the message should arrive on the queue.
withDelaySeconds()指定了消息应该在多长时间后到达队列中。
4.2. Posting a Message to a FIFO Queue
4.2.向FIFO队列发布消息
The only difference, in this case, is that we’ll have to specify the group to which the message belongs:
在这种情况下,唯一的区别是,我们必须指定该邮件所属的组:。
SendMessageRequest sendMessageFifoQueue = new SendMessageRequest()
.withQueueUrl(fifoQueueUrl)
.withMessageBody("Another simple message.")
.withMessageGroupId("baeldung-group-1")
.withMessageAttributes(messageAttributes);
As you can see in the code example above, we specify the group by using withMessageGroupId().
正如你在上面的代码例子中看到的,我们通过使用withMessageGroupId()来指定组。
4.3. Posting Multiple Messages to a Queue
4.3.将多个消息发布到队列中
We can also post multiple messages to a queue, using a single request. We’ll create a list of SendMessageBatchRequestEntry which we’ll send using an instance of SendMessageBatchRequest:
我们还可以使用一个请求,将多个消息发布到队列中。我们将创建一个SendMessageBatchRequestEntry的列表,我们将使用SendMessageBatchRequest的实例发送。
List <SendMessageBatchRequestEntry> messageEntries = new ArrayList<>();
messageEntries.add(new SendMessageBatchRequestEntry()
.withId("id-1")
.withMessageBody("batch-1")
.withMessageGroupId("baeldung-group-1"));
messageEntries.add(new SendMessageBatchRequestEntry()
.withId("id-2")
.withMessageBody("batch-2")
.withMessageGroupId("baeldung-group-1"));
SendMessageBatchRequest sendMessageBatchRequest
= new SendMessageBatchRequest(fifoQueueUrl, messageEntries);
sqs.sendMessageBatch(sendMessageBatchRequest);
5. Reading Messages from Queues
5.从队列中读取消息
We can receive messages from our queues by invoking the receiveMessage() method on an instance of ReceiveMessageRequest:
我们可以通过在ReceiveMessageRequest的实例上调用receiveMessage()方法,从我们的队列中接收消息:。
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(fifoQueueUrl)
.withWaitTimeSeconds(10)
.withMaxNumberOfMessages(10);
List<Message> sqsMessages = sqs.receiveMessage(receiveMessageRequest).getMessages();
Using withMaxNumberOfMessages(), we specify how many messages to get from the queue — although it should be noted that the maximum is 10.
使用withMaxNumberOfMessages(),我们指定从队列中获取多少条消息–尽管应该注意,最大限度是10。
The method withWaitTimeSeconds() enables long-polling. Long polling is a way to limit the number of receive message requests we send to SQS.
方法withWaitTimeSeconds()启用长时间轮询。长时间轮询是一种限制我们发送给SQS的接收消息请求数量的方法。。
Simply put, this means that we’ll wait up to the specified number of seconds to retrieve a message. If there are no messages in the queue for that duration, then the request will return empty. If a message arrives on the queue during that time, it will be returned.
简单地说,这意味着我们将等待到指定的秒数来检索一个消息。如果在这段时间内队列中没有消息,那么请求将返回空消息。如果在这段时间内有消息到达队列中,它将被返回。
We can get the attributes and body of a given message:
我们可以获得一个给定的消息的属性和正文:。
sqsMessages.get(0).getAttributes();
sqsMessages.get(0).getBody();
6. Deleting a Message from a Queue
6.从队列中删除一个消息
To delete a message, we’ll use a DeleteMessageRequest:
要删除一条信息,我们将使用一个DeleteMessageRequest。
sqs.deleteMessage(new DeleteMessageRequest()
.withQueueUrl(fifoQueueUrl)
.withReceiptHandle(sqsMessages.get(0).getReceiptHandle()));
7. Dead Letter Queues
7.死信排队
A dead letter queue must be of the same type as its base queue — it must be FIFO if the base queue is FIFO, and standard if the base queue is standard. For this example, we’ll use a standard queue.
死信队列必须与它的基本队列具有相同的类型 – 如果基本队列是 FIFO,它必须是 FIFO,如果基本队列是标准的,它必须是标准的。对于这个例子,我们将使用一个标准队列。
The first thing we need to do is to create what will become our dead letter queue:
我们需要做的第一件事是,创建将成为我们的死信队列:。
String deadLetterQueueUrl = sqs.createQueue("baeldung-dead-letter-queue").getQueueUrl();
Next, we’ll get our newly created queue’s ARN (Amazon Resource Name):
接下来,我们将获取我们新创建的队列的ARN(亚马逊资源名称): 。
GetQueueAttributesResult deadLetterQueueAttributes = sqs.getQueueAttributes(
new GetQueueAttributesRequest(deadLetterQueueUrl)
.withAttributeNames("QueueArn"));
String deadLetterQueueARN = deadLetterQueueAttributes.getAttributes()
.get("QueueArn");
Finally, we set this newly created queue to be our original standard queue’s dead letter queue:
最后,我们将这个新创建的队列设置为我们原来的标准队列的死字队列:。
SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest()
.withQueueUrl(standardQueueUrl)
.addAttributesEntry("RedrivePolicy",
"{\"maxReceiveCount\":\"2\", "
+ "\"deadLetterTargetArn\":\"" + deadLetterQueueARN + "\"}");
sqs.setQueueAttributes(queueAttributesRequest);
The JSON packet we set in the addAttributesEntry() method when building our SetQueueAttributesRequest instance contains the information we need: the maxReceiveCount is 2, which means that if a message is received this many times, it’s assumed to haven’t been processed correctly, and is sent to our dead letter queue.
在构建我们的SetQueueAttributesRequest 实例时,我们在addAttributesEntry()方法中设置的JSON包包含了我们需要的信息:maxReceiveCount是2,这意味着如果一个消息被收到这么多次,它被认为没有被正确处理,并被发送到我们的死信队列。
The deadLetterTargetArn attribute points our standard queue to our newly created dead letter queue.
deadLetterTargetArn属性将我们的标准队列指向我们新创建的死信队列。
8. Monitoring
8.监测
We can check how many messages are currently in a given queue, and how many are in flight with the SDK. First, we’ll need to create a GetQueueAttributesRequest.
我们可以通过SDK检查某个队列中当前有多少条消息,以及有多少条是飞行中的。首先,我们需要创建一个GetQueueAttributesRequest。
From there we’ll check the state of the queue:
从那里我们将检查队列的状态。
GetQueueAttributesRequest getQueueAttributesRequest
= new GetQueueAttributesRequest(standardQueueUrl)
.withAttributeNames("All");
GetQueueAttributesResult getQueueAttributesResult
= sqs.getQueueAttributes(getQueueAttributesRequest);
System.out.println(String.format("The number of messages on the queue: %s",
getQueueAttributesResult.getAttributes()
.get("ApproximateNumberOfMessages")));
System.out.println(String.format("The number of messages in flight: %s",
getQueueAttributesResult.getAttributes()
.get("ApproximateNumberOfMessagesNotVisible")));
More in-depth monitoring can be achieved using Amazon Cloud Watch.
可以使用Amazon Cloud Watch实现更深入的监控。
9. Conclusion
9.结论
In this article, we’ve seen how to manage SQS queues using the AWS Java SDK.
在这篇文章中,我们已经看到了如何使用AWS Java SDK来管理SQS队列。
As usual, all code samples used in the article can be found over on GitHub.
像往常一样,文章中使用的所有代码样本都可以在GitHub上找到over。