RabbitMQ Message Dispatching with Spring AMQP – 用Spring AMQP进行RabbitMQ消息调度

最后修改: 2017年 4月 22日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

In this tutorial, we’ll explore the concept of fanout and topic exchanges with Spring AMQP and RabbitMQ.

在本教程中,我们将探讨fanout的概念和Spring AMQPRabbitMQ的主题交换。

At a high level, fanout exchanges will broadcast the same message to all bound queues, while topic exchanges use a routing key for passing messages to a particular bound queue or queues.

在高层次上,扇出交换向所有绑定的队列广播同一消息,而主题交换使用路由键来将消息传递到一个或多个特定的绑定队列

Prior reading of Messaging With Spring AMQP is recommended for this tutorial.

本教程建议事先阅读Messaging With Spring AMQP>。

2. Setting Up a Fanout Exchange

2.设置扇形交换

Let’s set up one fanout exchange with two queues bound to it. When we send a message to this exchange both queues will receive the message. Our fanout exchange ignores any routing key included with the message.

让我们建立一个有两个队列绑定的扇出交换。当我们向这个交换中心发送一个消息时,两个队列都会收到这个消息。我们的扇出交换忽略了消息中包含的任何路由密钥。

Spring AMQP allows us to aggregate all the declarations of queues, exchanges, and bindings in a Declarables object:

Spring AMQP允许我们在一个Declarables对象中聚合所有队列、交换和绑定的声明:

@Bean
public Declarables fanoutBindings() {
    Queue fanoutQueue1 = new Queue("fanout.queue1", false);
    Queue fanoutQueue2 = new Queue("fanout.queue2", false);
    FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");

    return new Declarables(
      fanoutQueue1,
      fanoutQueue2,
      fanoutExchange,
      BindingBuilder.bind(fanoutQueue1).to(fanoutExchange),
      BindingBuilder.bind(fanoutQueue2).to(fanoutExchange));
}

3. Setting Up a Topic Exchange

3.设置主题交流

Now, we’ll also set up a topic exchange with two queues, each with a different binding pattern:

现在,我们还将设置一个有两个队列的主题交换,每个队列都有不同的绑定模式。

@Bean
public Declarables topicBindings() {
    Queue topicQueue1 = new Queue(topicQueue1Name, false);
    Queue topicQueue2 = new Queue(topicQueue2Name, false);

    TopicExchange topicExchange = new TopicExchange(topicExchangeName);

    return new Declarables(
      topicQueue1,
      topicQueue2,
      topicExchange,
      BindingBuilder
        .bind(topicQueue1)
        .to(topicExchange).with("*.important.*"),
      BindingBuilder
        .bind(topicQueue2)
        .to(topicExchange).with("#.error"));
}

A topic exchange allows us to bind queues to it with different key patterns. This is very flexible and allows us to bind multiple queues with the same pattern or even multiple patterns to the same queue.

一个主题交换允许我们以不同的关键模式将队列绑定到它。这非常灵活,允许我们以相同的模式绑定多个队列,甚至将多个模式绑定到同一个队列。

When the message’s routing key matches the pattern, it will be placed in the queue. If a queue has multiple bindings which match the message’s routing key, only one copy of the message is placed on the queue.

当消息的路由键与模式相匹配时,它将被放在队列中。如果一个队列有多个与消息的路由键相匹配的绑定,则该消息只有一份被放在队列中。

Our binding patterns can use an asterisk (“*”) to match a word in a specific position or a pound sign (“#”) to match zero or more words.

我们的绑定模式可以使用星号(”*”)来匹配特定位置的单词,或者使用磅符号(”#”)来匹配零个或多个单词。

So, our topicQueue1 will receive messages which have routing keys having a three-word pattern with the middle word being “important” – for example: “user.important.error” or “blog.important.notification”.

因此,我们的topicQueue1将接收具有三个字模式的路由键的消息,中间的字是 “重要”–例如。“user.important.error”“blog.important.notification”

And, our topicQueue2 will receive messages which have routing keys ending in the word error; matching examples are “error”, “user.important.error” or “blog.post.save.error”.

而且,我们的topicQueue2将接收具有以error一词结尾的路由键的消息;匹配的例子有“error”“user.important.error”“blog.post.save.error”。

4. Setting Up a Producer

4.建立一个生产者

We’ll use the convertAndSend method of the RabbitTemplate to send our sample messages:

我们将使用RabbitTemplateconvertAndSend方法来发送我们的示例消息。

    String message = " payload is broadcast";
    return args -> {
        rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message);
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, 
            "topic important warn" + message);
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, 
            "topic important error" + message);
    };

The RabbitTemplate provides many overloaded convertAndSend() methods for different exchange types.

RabbitTemplate为不同的交换类型提供了许多重载的convertAndSend()方法

When we send a message to a fanout exchange, the routing key is ignored, and the message is passed to all bound queues.

当我们向扇形交换发送消息时,路由键被忽略,消息被传递给所有绑定的队列。

When we send a message to the topic exchange, we need to pass a routing key. Based on this routing key the message will be delivered to specific queues.

当我们向主题交换发送消息时,我们需要传递一个路由密钥。根据这个路由密钥,消息将被传递到特定的队列中。

5. Configuring Consumers

5.配置消费者

Finally, let’s set up four consumers – one for each queue – to pick up the messages produced:

最后,让我们设置四个消费者–每个队列一个–来接收产生的消息。

    @RabbitListener(queues = {FANOUT_QUEUE_1_NAME})
    public void receiveMessageFromFanout1(String message) {
        System.out.println("Received fanout 1 message: " + message);
    }

    @RabbitListener(queues = {FANOUT_QUEUE_2_NAME})
    public void receiveMessageFromFanout2(String message) {
        System.out.println("Received fanout 2 message: " + message);
    }

    @RabbitListener(queues = {TOPIC_QUEUE_1_NAME})
    public void receiveMessageFromTopic1(String message) {
        System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message);
    }

    @RabbitListener(queues = {TOPIC_QUEUE_2_NAME})
    public void receiveMessageFromTopic2(String message) {
        System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message);
    }

We configure consumers using the @RabbitListener annotation. The only argument passed here is the queues’ name. Consumers are not aware here of exchanges or routing keys.

我们使用@RabbitListener注解来配置消费者。这里传递的唯一参数是队列的名称。消费者在这里不知道交换或路由键。

6. Running the Example

6.运行实例

Our sample project is a Spring Boot application, and so it will initialize the application together with a connection to RabbitMQ and set up all queues, exchanges, and bindings.

我们的示例项目是一个 Spring Boot 应用程序,因此它将与 RabbitMQ 的连接一起初始化应用程序,并设置所有队列、交换和绑定。

By default, our application expects a RabbitMQ instance running on the localhost on port 5672. We can modify this and other defaults in application.yaml.

默认情况下,我们的应用程序希望有一个运行在端口为 5672 的 localhost 上的 RabbitMQ 实例。我们可以在 application.yaml 中修改这个和其他默认值。

Our project exposes HTTP endpoint on the URI – /broadcast – that accepts POSTs with a message in the request body.

我们的项目在URI上公开了HTTP端点–/broadcast–它接受请求体中带有消息的POST。

When we send a request to this URI with body “Test” we should see something similar to this in the output:

当我们向这个URI发送一个主体为 “Test “的请求时,我们应该在输出中看到与此类似的东西。

Received fanout 1 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important warn payload is broadcast
Received topic 2 (#.error) message: topic important error payload is broadcast
Received fanout 2 message: fanout payload is broadcast
Received topic 1 (*.important.*) message: topic important error payload is broadcast

The order in which we will see these messages is, of course, not guaranteed.

当然,我们将看到这些信息的顺序并不保证。

7. Conclusion

7.结语

In this quick tutorial, we covered fanout and topic exchanges with Spring AMQP and RabbitMQ.

在这个快速教程中,我们涵盖了Spring AMQP和RabbitMQ的扇出和主题交换。

The complete source code and all code snippets for this tutorial are available on the GitHub repository.

本教程的完整源代码和所有代码片段均可在GitHub资源库上找到。