1. Overview
1.概述
In this tutorial, we’ll look at the use of message queues and publishers/subscribers. These are common patterns used in distributed systems for two or more services to communicate with one another.
在本教程中,我们将看看消息队列和发布者/订阅者的使用。这些是分布式系统中常用的模式,用于两个或多个服务之间的通信。
For this tutorial, all examples will be shown using the RabbitMQ message broker so first follow RabbitMQ’s tutorial to get up and running locally. For a deeper dive on RabbitMQ check out our other tutorial.
在本教程中,所有示例都将使用 RabbitMQ 消息代理进行展示,因此请首先遵循 RabbitMQ 的教程,以在本地启动和运行。要深入了解 RabbitMQ,请查看我们的其他教程。
Note: there are many alternatives to RabbitMQ that can be used for the same examples in this tutorial such as Kafka, Google Cloud Pub-Sub, and Amazon SQS to name but a few.
注意:有许多RabbitMQ的替代品可用于本教程中的相同示例,如Kafka、Google Cloud Pub-Sub和Amazon SQS等,仅举几例。
2. What Are Message Queues?
2.什么是消息队列?
Let’s start by looking at message queues. Message queues consist of a publishing service and multiple consumer services that communicate via a queue. This communication is typically one way where the publisher will issue commands to the consumers. The publishing service will typically put a message on a queue or exchange and a single consumer service will consume this message and perform an action based on this.
让我们先来看看消息队列。消息队列由一个发布服务和多个消费者服务组成,它们通过队列进行通信。这种通信通常是单向的,发布者将向消费者发布命令。发布服务通常会将一条消息放在队列或交换中,单个消费者服务将消费这条消息并在此基础上执行一个动作。
Consider the following exchange:
请考虑以下交流。
From this, we can see a Publisher service that is putting a message ‘m n+1′ onto the queue. In addition, we can also see multiple messages already in existence on the queue waiting to be consumed. On the right-hand side, we have 2 consuming services ‘A’ and ‘B’ that is listening to the queue for messages.
从这里,我们可以看到一个Publisher服务正在把一个消息’m n+1’放到队列中。此外,我们还可以看到队列中已经存在的多个消息等待被消费。在右侧,我们有两个消费服务’A’和’B’正在监听队列中的消息。
Let’s now consider the same exchange after some time:
现在让我们考虑一段时间后的相同交换。
First, we can see that the Publisher’s message has been pushed to the tail of the queue. Next, the important part to consider is the right-hand side of the image. We can see that consumer ‘A’ has read the message ‘m 1′ and, as such, it is no longer available in the queue for the other service ‘B’ to consume.
首先,我们可以看到Publisher的信息已经被推到了队列的尾部。接下来,要考虑的重要部分是图片的右侧。我们可以看到,消费者 “A “已经阅读了消息 “m 1″,因此,它在队列中不再可供其他服务 “B “使用了。
2.1. Where to Use Message Queues
2.1.在哪里使用消息队列
Message queues are often used where we want to delegate work from a service. In doing so, we want to ensure that the work is only executed one time.
消息队列经常被用于我们希望从服务中委托工作的地方。在这样做时,我们希望确保工作只执行一次。
Using message queues is popular in micro-service architectures and while developing cloud-based or serverless applications as it allows us to horizontally scale our app based on load.
在微服务架构中,以及在开发基于云或无服务器的应用程序时,使用消息队列很受欢迎,因为它允许我们根据负载水平地扩展我们的应用程序。
For example, if there are many messages on the queue waiting to be processed, we can spin up multiple consumer services which listen to the same message queue and handle the influx in messages. Once the messages have been handled, the services can then be turned off when traffic is minimal to save on running costs.
例如,如果队列中有许多等待处理的消息,我们可以启动多个消费者服务,听取同一消息队列并处理涌入的消息。一旦消息得到处理,当流量最小时,这些服务就可以关闭,以节省运行成本。
2.2. Example Using RabbitMQ
2.2.使用 RabbitMQ 的例子
Let’s go through an example for clarity. Our example will take the form of a pizza restaurant. Imagine people are able to order pizzas via an app and chefs at the pizzeria will pick up orders as they come in. In this example, the customer is our publisher and the chef(s) are our consumers.
让我们通过一个例子来了解清楚。我们的例子将采取一个比萨饼店的形式。想象一下,人们能够通过一个应用程序订购披萨,而披萨店的厨师会在他们进来时接单。在这个例子中,客户是我们的发布者,厨师是我们的消费者。
First, let’s define our queue:
首先,让我们定义我们的队列。
private static final String MESSAGE_QUEUE = "pizza-message-queue";
@Bean
public Queue queue() {
return new Queue(MESSAGE_QUEUE);
}
Using Spring AMQP, we have created a queue named “pizza-message-queue”. Next, let’s define our publisher that will post messages to our newly defined queue:
使用Spring AMQP,我们已经创建了一个名为 “pizza-message-queue “的队列。接下来,让我们定义我们的发布器,它将向我们新定义的队列发布消息。
public class Publisher {
private RabbitTemplate rabbitTemplate;
private String queue;
public Publisher(RabbitTemplate rabbitTemplate, String queue) {
this.rabbitTemplate = rabbitTemplate;
this.queue = queue;
}
@PostConstruct
public void postMessages() {
rabbitTemplate.convertAndSend(queue, "1 Pepperoni");
rabbitTemplate.convertAndSend(queue, "3 Margarita");
rabbitTemplate.convertAndSend(queue, "1 Ham and Pineapple (yuck)");
}
}
Spring AMQP will create a RabbitTemplate bean for us that has a connection to our RabbitMQ exchange to reduce configuration overhead. Our Publisher makes use of this by sending 3 messages to our queue.
Spring AMQP将为我们创建一个RabbitTemplate bean,它有一个与我们的RabbitMQ交换的连接以减少配置开销。我们的发布者通过向我们的队列发送3条消息来利用这一点。
Now that our pizza orders are in we need a separate consumer application. This will act as our chef in the example and read messages:
现在我们的比萨饼订单已经到了,我们需要一个单独的消费者应用程序。这将作为我们例子中的厨师,读取信息。
public class Consumer {
public void receiveOrder(String message) {
System.out.printf("Order received: %s%n", message);
}
}
Let’s now create a MessageListenerAdapter for our queue that will call our Consumer’s receive order method using reflection:
现在让我们为我们的队列创建一个MessageListenerAdapter ,它将使用反射调用我们的消费者的接收顺序方法。
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(MESSAGE_QUEUE);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(Consumer consumer) {
return new MessageListenerAdapter(consumer, "receiveOrder");
}
Messages read from the queue will now be routed to the receiveOrder method of the Consumer class. To run this application, we can create as many Consumer applications as we wish to fulfill the incoming orders. For example, if 400 pizza orders were put on the queue then we may need more than 1 consumer ‘chef’, or orders will be slow. In this case, we might spin up 10 consumer instances to fulfill the orders in a timely manner.
从队列中读取的消息现在将被路由到Consumer类的receiveOrder方法。为了运行这个应用程序,我们可以创建尽可能多的消费者应用程序来完成传入的订单。例如,如果400个比萨饼订单被放在队列中,那么我们可能需要多于1个消费者 “厨师”,否则订单会很慢。在这种情况下,我们可能会启动10个消费者实例来及时完成订单。
3. What Is the Pub-Sub?
3.什么是Pub-Sub?
Now that we have covered message queues, let’s look into pub-sub. Conversely, to message queues, in a pub-sub architecture we want all our consuming (subscribing) applications to get at least 1 copy of the message that our publisher posts to an exchange.
现在我们已经介绍了消息队列,让我们来了解一下pub-sub。与消息队列相反,在pub-sub架构中,我们希望所有消费(订阅)应用程序都能获得发布者发布到交易所的消息的至少1份。
Consider the following exchange:
请考虑以下交流。
On the left we have a publisher sending a message “m n+1” to a Topic. This Topic will broadcast this message to its subscriptions. These subscriptions are bound to queues. Each queue has a listening subscriber service awaiting messages.
在左边,我们有一个发布者发送一个消息 “m n+1 “给一个主题。该主题将向其订阅者广播这一消息。这些订阅被绑定到队列中。每个队列都有一个监听的订阅者服务,等待消息。
Let’s now consider the same exchange after some time has passed:
现在让我们考虑一段时间后的相同交换。
Both the subscribing services are consuming “m 1” as both received a copy of this message. In addition, the Topic is distributing the new message “m n+1” to all of its subscribers.
两个订阅服务都在消费 “m 1″,因为它们都收到了该消息的副本。此外,主题正在将新消息 “m n+1 “分发给其所有的订阅者。
Pub sub should be used where we need a guarantee that each subscriber gets a copy of the message.
Pub sub应该用在我们需要保证每个订阅者得到一份信息的地方。
3.1. Example Using RabbitMQ
3.1.使用 RabbitMQ 的例子
Imagine we have a clothing website. This website is able to send push notifications to users to notify them of deals. Our system can send notifications via email or text alerts. In this scenario, the website is our publisher and the text and email alerting services are our subscribers.
想象一下,我们有一个服装网站。这个网站能够向用户发送推送通知,通知他们有优惠。我们的系统可以通过电子邮件或文本提醒来发送通知。在这种情况下,网站是我们的发布者,而文本和电子邮件提醒服务是我们的用户。
First, let’s define our topic exchange and bind 2 queues to it:
首先,让我们定义我们的主题交换,并将2个队列绑定到它。
private static final String PUB_SUB_TOPIC = "notification-topic";
private static final String PUB_SUB_EMAIL_QUEUE = "email-queue";
private static final String PUB_SUB_TEXT_QUEUE = "text-queue";
@Bean
public Queue emailQueue() {
return new Queue(PUB_SUB_EMAIL_QUEUE);
}
@Bean
public Queue textQueue() {
return new Queue(PUB_SUB_TEXT_QUEUE);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange(PUB_SUB_TOPIC);
}
@Bean
public Binding emailBinding(Queue emailQueue, TopicExchange exchange) {
return BindingBuilder.bind(emailQueue).to(exchange).with("notification");
}
@Bean
public Binding textBinding(Queue textQueue, TopicExchange exchange) {
return BindingBuilder.bind(textQueue).to(exchange).with("notification");
}
We have now bound 2 queues using the routing key “notification” meaning any messages posted on the topic with this routing key will go to both queues. Updating the Publisher class that we created earlier, we can send some messages to our exchange:
我们现在已经使用路由键 “notification “绑定了2个队列,这意味着任何用这个路由键发布在主题上的消息都会进入两个队列。更新我们之前创建的Publisher 类,我们可以发送一些消息到我们的交易所。
rabbitTemplate.convertAndSend(topic, "notification", "New Deal on T-Shirts: 95% off!");
rabbitTemplate.convertAndSend(topic, "notification", "2 for 1 on all Jeans!");
4. Comparison
4.比较
Now that we’ve touched on both areas, let’s briefly compare both types of exchange.
现在我们已经触及了这两个领域,让我们简单比较一下这两种类型的交换。
As previously mentioned, both message queues and pub-sub architecture patterns are a great way to break up an application to make it more horizontally scalable.
如前所述,消息队列和pub-sub架构模式都是分割应用程序的好方法,可以使其更具横向扩展性。
Another benefit of using either pub-sub or message queues is that the communication is more durable than traditional synchronous modes of communication. For example, if app A communicates to app B via asynchronous HTTP call then if either of the applications goes down the data is lost and the request must be retried.
使用pub-sub或消息队列的另一个好处是通信比传统的同步通信模式更持久。例如,如果应用程序A通过异步HTTP调用与应用程序B进行通信,那么如果任何一个应用程序发生故障,数据就会丢失,必须重试该请求。
Using message queues if a consumer application instance goes down then another consumer will be able to handle the message instead. Using pub-sub, if a subscriber is down then once it has recovered the messages it has missed will be available for consumption in its subscribing queue.
使用消息队列,如果一个消费者应用程序实例发生故障,那么另一个消费者将能够处理该消息。使用pub-sub,如果一个订阅者发生故障,那么一旦它恢复了,它所错过的消息将可以在其订阅队列中消费。
Finally, context is key. Choosing whether to use pub-sub or message queue architecture comes down to defining exactly how you want the consuming service to behave. The most important factor to keep in mind is asking “Does it matter if every consumer gets every message?”
最后,背景是关键。选择使用pub-sub还是消息队列架构,归根结底是要准确定义您希望消费服务的行为方式。需要记住的最重要的因素是问:”如果每个消费者都获得每条消息,这是否重要?
5. Conclusion
5.总结
In this tutorial we’ve looked at pub-sub and message queues and some of the characteristics of each. All the code mentioned in this tutorial can be found over on GitHub.
在本教程中,我们已经了解了pub-sub和消息队列以及两者的一些特点。本教程中提到的所有代码都可以在GitHub上找到over。