最后修改: 2018年 7月 2日


1. Overview


This tutorial shows how to create a simple Spring Boot Reactive Application that integrates with the RabbitMQ messaging server, a popular implementation of the AMQP messaging standard.

本教程展示了如何创建一个简单的Spring Boot反应式应用程序,该应用程序与RabbitMQ消息传递服务器(AMQP消息传递标准的一个流行实现)集成。

We cover both – point-to-point and publish-subscribe scenarios – using a distributed setup that highlights the differences between both patterns.


Note that we assume a basic knowledge of AMQP, RabbitMQ and Spring Boot, in particular, key concepts such as Exchanges, Queues, Topics and so forth. More information about those concepts can be found in the links below:

请注意,我们假定您对AMQP、RabbitMQ和Spring Boot有基本的了解,特别是诸如交换、队列、主题等关键概念。关于这些概念的更多信息可以在下面的链接中找到。

2. RabbitMQ Server Setup

2.RabbitMQ 服务器设置

Although we could set up a local RabbitMQ locally, in practice, we’re more likely to use a dedicated installation with additional features such as high-availability, monitoring, security, etc.


In order to simulate such environment in our development machine, we’ll use Docker to create a server that our application will use.


The following command will start a standalone RabbitMQ server:

以下命令将启动一个独立的 RabbitMQ 服务器。

$ docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3

We don’t declare any persistent volume, so unread messages will be lost between restarts. The service will be available at port 5672 on the host.


We can check server logs with the docker logs command, which should produce an output such as this:

我们可以用docker logs命令检查服务器日志,它应该产生这样的输出。

$ docker logs rabbitmq
2018-06-09 13:42:29.718 [info] <0.33.0>
  Application lager started on node rabbit@rabbit
// ... some lines omitted
2018-06-09 13:42:33.491 [info] <0.226.0>
 Starting RabbitMQ 3.7.5 on Erlang 20.3.5
 Copyright (C) 2007-2018 Pivotal Software, Inc.
 Licensed under the MPL.  See http://www.rabbitmq.com/

  ##  ##
  ##  ##      RabbitMQ 3.7.5. Copyright (C) 2007-2018 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See http://www.rabbitmq.com/
  ######  ##
  ##########  Logs: <stdout>

              Starting broker...
2018-06-09 13:42:33.494 [info] <0.226.0>
 node           : rabbit@rabbit
 home dir       : /var/lib/rabbitmq
 config file(s) : /etc/rabbitmq/rabbitmq.conf
 cookie hash    : CY9rzUYh03PK3k6DJie09g==
 log(s)         : <stdout>
 database dir   : /var/lib/rabbitmq/mnesia/rabbit@rabbit

// ... more log lines

Since the image includes the rabbitmqctl utility, we can use it in order to execute administrative tasks in the context of our running image.


For instance, we can get server status information with the following command:


$ docker exec rabbitmq rabbitmqctl status
Status of node rabbit@rabbit ...
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
// ... other info omitted for brevity

Other useful commands include:


  • list_exchanges:  List all declared Exchanges
  • list_queues:  List all declared Queues, including the number of unread messages
  • list_bindings:  List all defines Bindings between exchanges and queues, also including routing keys

3. Spring AMQP Project Setup

3.Spring AMQP项目设置

Once we have our RabbitMQ server up and running, we can move on to create our Spring project. This sample project will allow any REST client to post and/or receive messages to the messaging server, using the Spring AMQP module and the corresponding Spring Boot starter in order to communicate with it.

一旦我们的 RabbitMQ 服务器启动并运行,我们就可以继续创建我们的 Spring 项目。该示例项目将允许任何 REST 客户端向消息服务器发布和/或接收消息,使用 Spring AMQP 模块和相应的 Spring Boot 启动器,以便与它通信。

The main dependencies we need to add to our pom.xml project file are:



The spring-boot-starter-amqp brings all AMQP-related stuff whereas the spring-boot-starter-webflux is the core dependency used to implement our reactive REST server.


Note: you can check the latest version of the Spring Boot Starter AMQP and Webflux modules on Maven Central.

注意:您可以在Maven中心查看Spring Boot Starter AMQPWebflux模块的最新版本。

4. Scenario 1: Point-to-Point Messaging


Is this first scenario, we’ll use a Direct Exchange, which is the logical entity in the broker to that receives messages from clients.


A Direct Exchange will route all incoming messages to one – and only one – queue, from which it will be available for consumption by clients. Multiple clients can subscribe to the same queue, but only one will receive a given message.


4.1. Exchange and Queues Setup


In our scenario, we use a DestinationInfo object that encapsulates the exchange name and routing key. A map keyed by destination name will be used to store all available destinations.


The following @PostConstruct method will be responsible for this initial setup:


private AmqpAdmin amqpAdmin;
private DestinationsConfig destinationsConfig;

public void setupQueueDestinations() {
        .forEach((key, destination) -> {
            Exchange ex = ExchangeBuilder.directExchange(
            Queue q = QueueBuilder.durable(
            Binding b = BindingBuilder.bind(q)

This method uses the adminAmqp bean created by Spring to declare Exchanges, Queues and bind them together using a given routing key.


All destinations come from a DestinationsConfig bean, which is a @ConfigurationProperties class used in our example.


This class has a property that is populated with DestinationInfo objects built from mappings read from the application.yml configuration file.


4.2. Producer Endpoint


Producers will send messages by sending an HTTP POST to the /queue/{name} location.

生产者将通过发送HTTP POST/queue/{name}位置来发送消息。

This is a reactive endpoint, so we use a Mono to return a simple acknowledgment:


public class SpringWebfluxAmqpApplication {
    // ... other members omitted
    private AmqpTemplate amqpTemplate;

    @PostMapping(value = "/queue/{name}")
    public Mono<ResponseEntity<?>> sendMessageToQueue(
      @PathVariable String name, @RequestBody String payload) {

        DestinationInfo d = destinationsConfig
        if (d == null) {
            return Mono.just(
        return Mono.fromCallable(() -> {
            return ResponseEntity.accepted().build();

We first check if the name parameter corresponds to a valid destination and if so, we use the autowired amqpTemplate instance to actually send out the payload – a simple String message – to RabbitMQ.

我们首先检查名称参数是否对应于有效的目标,如果是,我们使用自动连接的 amqpTemplate 实例来实际发送有效载荷 – 一个简单的 String 消息 – 到 RabbitMQ。

4.3. MessageListenerContainer Factory


In order to receive messages asynchronously, Spring AMQP uses a MessageContainerListener abstract class that mediates the information flow from AMQP Queues and listeners provided by an application.

为了异步接收消息,Spring AMQP使用了一个MessageContainerListener抽象类,它负责调解AMQP队列和应用程序提供的监听器之间的信息流。

Since we need a concrete implementation of this class in order to attach our message listeners, we define a factory that isolates the controller code from its actual implementation.


In our case, the factory method returns a new SimpleMessageContainerListener every time we call its createMessageListenerContainer method:


public class MessageListenerContainerFactory {

    private ConnectionFactory connectionFactory;

    public MessageListenerContainerFactory() {}

    public MessageListenerContainer createMessageListenerContainer(String queueName) {
        SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory);
        return mlc;

4.4. Consumer Endpoint


Consumers will access the same endpoint address used by producers (/queue/{name}) to get messages.


This endpoint returns a Flux of events, where each event corresponds to a received message:

该端点返回一个Flux 事件,其中每个事件对应于一个收到的消息。

private MessageListenerContainerFactory messageListenerContainerFactory;

  value = "/queue/{name}",
  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {

    DestinationInfo d = destinationsConfig
    if (d == null) {
        return Flux.just(ResponseEntity.notFound()

    MessageListenerContainer mlc = messageListenerContainerFactory

    Flux<String> f = Flux.<String> create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
        emitter.onRequest(v -> {
        emitter.onDispose(() -> {

    return Flux.interval(Duration.ofSeconds(5))
      .map(v -> "No news is good news")

After the initial check on the destination name, the consumer endpoint creates MessageListenerContainer using the MessageListenerContainerFactory and the queue name recovered from our registry.


Once we have our MessageListenerContainer, we create the message Flux using one of its create() builder methods.


In our particular case, we use one that takes a lambda taking a FluxSink argument, which we then use to bridge Spring AMQP´s listener-based async API to our reactive application.

在我们的特定案例中,我们使用了一个接受FluxSink参数的lambda,然后我们用它来连接Spring AMQP基于监听器的异步API和我们的反应式应用程序。

We also attach two additional lambdas to the emitter´s onRequest() and onDispose() callbacks so our MessageListenerContainer can allocate/release its internal resources following the Flux´s lifecycle.


Finally, we merge the resulting Flux with another one created with interval(), which creates a new event every five seconds. Those dummy messages play an important function in our case: without them, we’d only detect a client disconnection upon receiving a message and failing to send it, which can take a long time depending on your particular use case.


4.5. Testing


With both our consumer and publisher endpoints setup, we can now do some tests with our sample application.


We need to define RabbitMQ´s server connection details and at least one destination on our application.yml, which should look like this:

我们需要在我们的 application.yml 上定义 RabbitMQ 的服务器连接细节和至少一个目的地,它应该看起来像这样。

    host: localhost
    port: 5672
    username: guest
    password: guest
      exchange: nyse
      routing-key: NYSE

The spring.rabbitmq.* properties define the basic properties required to connect to our RabbitMQ server running in a local Docker container. Please note that the IP shown above is just an example an may be different in a particular setup.

spring.rabbitmq.* 属性定义了连接到运行在本地 Docker 容器中的 RabbitMQ 服务器所需的基本属性。请注意,上面显示的 IP 只是一个例子,在特定设置中可能有所不同。

Queues are defines using destinations.queues.<name>.*, where <name> is used as the destination name. Here we declared a single destination named “NYSE” that will send messages to the “nyse” exchange on RabbitMQ with an “NYSE” routing key.

队列是使用 destinations.queues.<name>.* 定义的,其中 <name> 被用作目标名称。这里我们声明了一个名为 “NYSE “的单一目的地,它将向 RabbitMQ 上的 “nyse “交易所发送消息,并使用 “NYSE “路由键。

Once we start the server via command line or from our IDE, we can start sending and receiving messages. We’ll use the curl utility, a common utility available for both Windows, Mac & Linux OSs.


The following listing shows how to send a message to our destination and the expected response from the server:


$ curl -v -d "Test message" http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
*   Trying
* Connected to localhost ( port 8080 (#0)
> POST /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 12
> Content-Type: application/x-www-form-urlencoded
* upload completely sent off: 12 out of 12 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
* Connection #0 to host localhost left intact

After executing this command we can verify that the message was received by RabbitMQ and is ready for consumption issuing the following command:

执行此命令后,我们可以验证消息是否被 RabbitMQ 接收并准备好用于消费,请发布以下命令。

$ docker exec rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    1

Now we can read messages with curl with the following command:


$ curl -v http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
*   Trying
* Connected to localhost ( port 8080 (#0)
> GET /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
data:Test message

data:No news is good news...

... same message repeating every 5 secs

As we can see, first we get the previously stored message and then we start to receive our dummy message every 5 seconds.


If we run again the command to list queues we can now see that there are no messages stored:


$ docker exec rabbitmq rabbitmqctl list_queues

$ docker exec rabbitmq rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    0

5. Scenario 2: Publish-Subscribe


Another common scenario for messaging applications is the Publish-Subscribe pattern, where a single message must be sent to multiple consumers.


RabbitMQ offers two types of exchanges that support those kinds of applications:  Fan-out and Topic.

RabbitMQ 提供了两种类型的交换,支持这些类型的应用程序。 扇出和主题。

The main difference between those two kinds is that the latter allows us to filter which messages to receive based on a routing key pattern (e.g. “alarm.mailserver.*”) supplied at registration time, whereas the former simply replicate incoming messages to all bound queues.

这两种的主要区别是,后者允许我们根据注册时提供的路由键模式(例如 “alarm.mailserver.*”)来过滤要接收的消息,而前者只是将传入的消息复制到所有绑定的队列中。

RabbitMQ also supports Header Exchanges, which allows for more complex message filtering, but its use is out of the scope of this article.

RabbitMQ 还支持 Header Exchanges,它允许更复杂的消息过滤,但其使用不在本文的范围内。

5.1. Destinations Setup


We define Pub/Sub destinations at startup time with another @PostConstruct method, as we did in the point-to-point scenario.

我们在启动时用另一个@PostConstruct 方法定义Pub/Sub目的地,就像我们在点对点方案中做的那样。

The only difference is that we only create the Exchanges, but no Queues –  those will be created on demand and bound to the Exchange later, as we want an exclusive Queue for each client:


public void setupTopicDestinations(
      .forEach((key, destination) -> {
          Exchange ex = ExchangeBuilder

5.2. Publisher Endpoint


Clients will use the publisher endpoint available at the /topic/{name} location in order to post messages that will be sent to all connected clients.


As in the previous scenario, we use a @PostMapping that returns a Mono with the status after sending the message:


@PostMapping(value = "/topic/{name}")
public Mono<ResponseEntity<?>> sendMessageToTopic(
  @PathVariable String name, @RequestBody String payload) {

    DestinationInfo d = destinationsConfig
    if (d == null) {
        return Mono.just(ResponseEntity.notFound().build());
   return Mono.fromCallable(() -> {
         d.getExchange(), d.getRoutingKey(),payload);   
            return ResponseEntity.accepted().build();

5.3. Subscriber Endpoint


Our subscriber endpoint will be located at /topic/{name}, producing a Flux of messages for connected clients.


Those messages include both the received messages and dummy messages generated every 5 seconds:


  value = "/topic/{name}",
  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
    DestinationInfo d = destinationsConfig.getTopics()
    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
    Queue topicQueue = createTopicQueue(d);
    String qname = topicQueue.getName();
    MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname);
    Flux<String> f = Flux.<String> create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
        emitter.onRequest(v -> {
        emitter.onDispose(() -> {
    return Flux.interval(Duration.ofSeconds(5))
        .map(v -> "No news is good news")

This code is basically the same as we’ve seen in the previous case, with only the following differences: first, we create a new Queue for every new subscriber.


We do that by a call to the createTopicQueue() method, which uses information from the DestinationInfo instance to create an exclusive, non-durable queue, that we then bind to the Exchange using the configured routing key:


private Queue createTopicQueue(DestinationInfo destination) {

    Exchange ex = ExchangeBuilder
    Queue q = QueueBuilder
    Binding b = BindingBuilder.bind(q)
    return q;

Note that, despite the fact we declare the Exchange again, RabbitMQ won’t create a new one, since we have already declared it at startup time.

请注意,尽管我们再次声明了 Exchange ,但 RabbitMQ 不会创建一个新的,因为我们已经在启动时声明了它。

The second difference is in the lambda that we pass to the onDispose() method, which this time will also delete the Queue when the subscriber disconnects.


5.3. Testing


In order to test the Pub-Sub scenario we must first define a topic destination in out application.yml like this:


## ... queue destinations omitted      
      exchange: alerts
      routing-key: WEATHER

Here, we’ve defined a topic endpoint that will be available at the /topic/weather location. This endpoint will be used to post messages to the “alerts” exchange on RabbitMQ with a “WEATHER” routing key.

在这里,我们定义了一个主题端点,它将在/topic/weather位置可用。这个端点将被用于向 RabbitMQ 上的 “alerts “交换器发布消息,并带有 “WEATHER “路由键。

After starting out server we can verify that the exchange has been created using the rabbitmqctl command:


$ docker exec docker_rabbitmq_1 rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.topic       topic
amq.fanout      fanout
amq.match       headers
amq.headers     headers
amq.rabbitmq.trace      topic
amq.direct      direct
alerts  topic

Now, if we issue the list_bindings command, we can see that there are no queues related to the “alerts” exchange:

现在,如果我们发出list_bindings命令,我们可以看到没有与 “alerts “交换有关的队列。

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        NYSE    queue   NYSE    []
nyse    exchange        NYSE    queue   NYSE    []

Let´s start a couple of subscribers that will subscribe to our destination, by opening two command shells and issuing the following command on each one:


$ curl -v http://localhost:8080/topic/weather
* timeout on name lookup is not supported
*   Trying
* Connected to localhost ( port 8080 (#0)
> GET /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
data:No news is good news...

# ... same message repeating indefinitely

Finally, we use curl once again to send some alerts to our subscribers:


$ curl -v -d "Hurricane approaching!" http://localhost:8080/topic/weather
* timeout on name lookup is not supported
*   Trying
* Connected to localhost ( port 8080 (#0)
> POST /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 22
> Content-Type: application/x-www-form-urlencoded
* upload completely sent off: 22 out of 22 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
* Connection #0 to host localhost left intact

Once we send the message, we can almost instantly see the message “Hurricane approaching !” on each subscriber´s shell.

一旦我们发送消息,我们几乎可以立即在每个用户的外壳上看到 “飓风即将来临!”的消息。

If we check now the available bindings, we can see that we have one queue for each subscriber:


$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        IBOV    queue   IBOV    []
        exchange        NYSE    queue   NYSE    []
        exchange        spring.gen-i0m0pbyKQMqpz2_KFZCd0g       
  queue   spring.gen-i0m0pbyKQMqpz2_KFZCd0g       []
        exchange        spring.gen-wCHALTsIS1q11PQbARJ7eQ       
  queue   spring.gen-wCHALTsIS1q11PQbARJ7eQ       []
alerts  exchange        spring.gen-i0m0pbyKQMqpz2_KFZCd0g     
  queue   WEATHER []
alerts  exchange        spring.gen-wCHALTsIS1q11PQbARJ7eQ     
  queue   WEATHER []
ibov    exchange        IBOV    queue   IBOV    []
nyse    exchange        NYSE    queue   NYSE    []
quotes  exchange        NYSE    queue   NYSE    []

Once we hit Ctrl-C on the subscriber´s shell, our gateway will eventually detect that the client has disconnected and will remove those bindings.


6. Conclusion


In this article, we’ve demonstrated how to create a simple reactive application that interacts with a RabbitMQ server using the spring-amqp module.

在本文中,我们演示了如何使用 spring-amqp模块创建一个与 RabbitMQ 服务器交互的简单反应式应用程序。

With just a few lines of code, we were able to create a functional HTTP-to-AMQP gateway that supports both Point-to-Point and Publish-Subscribe integration patterns, which we can easily extend to add additional features such as security by the addition of standard Spring features.


The code shown in this article is available over on Github.
