Spring AMQP in Reactive Applications – 反应式应用中的Spring AMQP

最后修改: 2018年 7月 2日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

This tutorial shows how to create a simple Spring Boot Reactive Application that integrates with the RabbitMQ messaging server, a popular implementation of the AMQP messaging standard.

本教程展示了如何创建一个简单的Spring Boot反应式应用程序,该应用程序与RabbitMQ消息传递服务器(AMQP消息传递标准的一个流行实现)集成。

We cover both – point-to-point and publish-subscribe scenarios – using a distributed setup that highlights the differences between both patterns.

我们涵盖了这两种情况–点对点和发布-订阅方案–使用分布式设置,突出了两种模式之间的差异。

Note that we assume a basic knowledge of AMQP, RabbitMQ and Spring Boot, in particular, key concepts such as Exchanges, Queues, Topics and so forth. More information about those concepts can be found in the links below:

请注意,我们假定您对AMQP、RabbitMQ和Spring Boot有基本的了解,特别是诸如交换、队列、主题等关键概念。关于这些概念的更多信息可以在下面的链接中找到。

2. RabbitMQ Server Setup

2.RabbitMQ 服务器设置

Although we could set up a local RabbitMQ locally, in practice, we’re more likely to use a dedicated installation with additional features such as high-availability, monitoring, security, etc.

尽管我们可以在本地设置一个RabbitMQ,但在实践中,我们更可能使用一个具有额外功能(如高可用性、监控、安全性等)的专用安装。

In order to simulate such environment in our development machine, we’ll use Docker to create a server that our application will use.

为了在我们的开发机器上模拟这种环境,我们将使用Docker来创建一个我们的应用程序将使用的服务器。

The following command will start a standalone RabbitMQ server:

以下命令将启动一个独立的 RabbitMQ 服务器。

$ docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3

We don’t declare any persistent volume, so unread messages will be lost between restarts. The service will be available at port 5672 on the host.

我们没有声明任何持久的卷,所以未读的信息在重启之间会丢失。该服务将在主机上的5672端口可用。

We can check server logs with the docker logs command, which should produce an output such as this:

我们可以用docker logs命令检查服务器日志,它应该产生这样的输出。

$ docker logs rabbitmq
2018-06-09 13:42:29.718 [info] <0.33.0>
  Application lager started on node rabbit@rabbit
// ... some lines omitted
2018-06-09 13:42:33.491 [info] <0.226.0>
 Starting RabbitMQ 3.7.5 on Erlang 20.3.5
 Copyright (C) 2007-2018 Pivotal Software, Inc.
 Licensed under the MPL.  See http://www.rabbitmq.com/

  ##  ##
  ##  ##      RabbitMQ 3.7.5. Copyright (C) 2007-2018 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See http://www.rabbitmq.com/
  ######  ##
  ##########  Logs: <stdout>

              Starting broker...
2018-06-09 13:42:33.494 [info] <0.226.0>
 node           : rabbit@rabbit
 home dir       : /var/lib/rabbitmq
 config file(s) : /etc/rabbitmq/rabbitmq.conf
 cookie hash    : CY9rzUYh03PK3k6DJie09g==
 log(s)         : <stdout>
 database dir   : /var/lib/rabbitmq/mnesia/rabbit@rabbit

// ... more log lines

Since the image includes the rabbitmqctl utility, we can use it in order to execute administrative tasks in the context of our running image.

由于该镜像包括rabbitmqctl工具,我们可以使用它,以便在我们运行的镜像背景下执行管理任务。

For instance, we can get server status information with the following command:

例如,我们可以通过以下命令获得服务器状态信息。

$ docker exec rabbitmq rabbitmqctl status
Status of node rabbit@rabbit ...
[{pid,299},
 {running_applications,
     [{rabbit,"RabbitMQ","3.7.5"},
      {rabbit_common,
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
          "3.7.5"},
// ... other info omitted for brevity

Other useful commands include:

其他有用的命令包括。

  • list_exchanges:  List all declared Exchanges
  • list_queues:  List all declared Queues, including the number of unread messages
  • list_bindings:  List all defines Bindings between exchanges and queues, also including routing keys

3. Spring AMQP Project Setup

3.Spring AMQP项目设置

Once we have our RabbitMQ server up and running, we can move on to create our Spring project. This sample project will allow any REST client to post and/or receive messages to the messaging server, using the Spring AMQP module and the corresponding Spring Boot starter in order to communicate with it.

一旦我们的 RabbitMQ 服务器启动并运行,我们就可以继续创建我们的 Spring 项目。该示例项目将允许任何 REST 客户端向消息服务器发布和/或接收消息,使用 Spring AMQP 模块和相应的 Spring Boot 启动器,以便与它通信。

The main dependencies we need to add to our pom.xml project file are:

我们需要添加到pom.xml项目文件中的主要依赖项是。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.0.3.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>2.0.2.RELEASE</version> 
</dependency>

The spring-boot-starter-amqp brings all AMQP-related stuff whereas the spring-boot-starter-webflux is the core dependency used to implement our reactive REST server.

spring-boot-starter-amqp带来了所有与AMQP相关的东西,而spring-boot-starter-webflux是用于实现我们反应式REST服务器的核心依赖。

Note: you can check the latest version of the Spring Boot Starter AMQP and Webflux modules on Maven Central.

注意:您可以在Maven中心查看Spring Boot Starter AMQPWebflux模块的最新版本。

4. Scenario 1: Point-to-Point Messaging

4.场景1:点对点信息传递

Is this first scenario, we’ll use a Direct Exchange, which is the logical entity in the broker to that receives messages from clients.

在第一种情况下,我们将使用一个直接交换器,它是经纪人中接收客户消息的逻辑实体。

A Direct Exchange will route all incoming messages to one – and only one – queue, from which it will be available for consumption by clients. Multiple clients can subscribe to the same queue, but only one will receive a given message.

直接交换将所有传入的消息路由到一个–而且只有一个–队列,从那里可供客户使用。多个客户可以订阅同一个队列,但只有一个会收到给定的消息。

4.1. Exchange and Queues Setup

4.1.交换和队列设置

In our scenario, we use a DestinationInfo object that encapsulates the exchange name and routing key. A map keyed by destination name will be used to store all available destinations.

在我们的方案中,我们使用一个DestinationInfo对象,它封装了交换名称和路由密钥。一个以目的地名称为键的地图将被用来存储所有可用的目的地。

The following @PostConstruct method will be responsible for this initial setup:

下面的@PostConstruct方法将负责这个初始设置。

@Autowired
private AmqpAdmin amqpAdmin;
    
@Autowired
private DestinationsConfig destinationsConfig;

@PostConstruct
public void setupQueueDestinations() {
    destinationsConfig.getQueues()
        .forEach((key, destination) -> {
            Exchange ex = ExchangeBuilder.directExchange(
              destination.getExchange())
              .durable(true)
              .build();
            amqpAdmin.declareExchange(ex);
            Queue q = QueueBuilder.durable(
              destination.getRoutingKey())
              .build();
            amqpAdmin.declareQueue(q);
            Binding b = BindingBuilder.bind(q)
              .to(ex)
              .with(destination.getRoutingKey())
              .noargs();
            amqpAdmin.declareBinding(b);
        });
}

This method uses the adminAmqp bean created by Spring to declare Exchanges, Queues and bind them together using a given routing key.

这个方法使用Spring创建的adminAmqpbean来声明交换、队列并使用给定的路由键将它们绑定在一起。

All destinations come from a DestinationsConfig bean, which is a @ConfigurationProperties class used in our example.

所有的目的地都来自DestinationsConfigbean,它是我们的例子中使用的@ConfigurationProperties类。

This class has a property that is populated with DestinationInfo objects built from mappings read from the application.yml configuration file.

这个类有一个属性,它由从application.yml配置文件中读取的映射构建的DestinationInfo对象填充。

4.2. Producer Endpoint

4.2.生产者端点

Producers will send messages by sending an HTTP POST to the /queue/{name} location.

生产者将通过发送HTTP POST/queue/{name}位置来发送消息。

This is a reactive endpoint, so we use a Mono to return a simple acknowledgment:

这是一个反应式的端点,所以我们使用一个Mono来返回一个简单的确认。

@SpringBootApplication
@EnableConfigurationProperties(DestinationsConfig.class)
@RestController
public class SpringWebfluxAmqpApplication {
 
    // ... other members omitted
 
    @Autowired
    private AmqpTemplate amqpTemplate;

    @PostMapping(value = "/queue/{name}")
    public Mono<ResponseEntity<?>> sendMessageToQueue(
      @PathVariable String name, @RequestBody String payload) {

        DestinationInfo d = destinationsConfig
          .getQueues().get(name);
        if (d == null) {
            return Mono.just(
              ResponseEntity.notFound().build());
        }
    
        return Mono.fromCallable(() -> {
            amqpTemplate.convertAndSend(
              d.getExchange(), 
              d.getRoutingKey(), 
              payload);  
            return ResponseEntity.accepted().build();
        });
    }

We first check if the name parameter corresponds to a valid destination and if so, we use the autowired amqpTemplate instance to actually send out the payload – a simple String message – to RabbitMQ.

我们首先检查名称参数是否对应于有效的目标,如果是,我们使用自动连接的 amqpTemplate 实例来实际发送有效载荷 – 一个简单的 String 消息 – 到 RabbitMQ。

4.3. MessageListenerContainer Factory

4.3.MessageListenerContainer工厂

In order to receive messages asynchronously, Spring AMQP uses a MessageContainerListener abstract class that mediates the information flow from AMQP Queues and listeners provided by an application.

为了异步接收消息,Spring AMQP使用了一个MessageContainerListener抽象类,它负责调解AMQP队列和应用程序提供的监听器之间的信息流。

Since we need a concrete implementation of this class in order to attach our message listeners, we define a factory that isolates the controller code from its actual implementation.

由于我们需要这个类的具体实现来附加我们的消息监听器,我们定义了一个工厂,将控制器代码与实际实现隔离开来。

In our case, the factory method returns a new SimpleMessageContainerListener every time we call its createMessageListenerContainer method:

在我们的案例中,工厂方法在我们每次调用其createMessageListenerContainer方法时都会返回一个新的SimpleMessageContainerListener

@Component
public class MessageListenerContainerFactory {

    @Autowired
    private ConnectionFactory connectionFactory;

    public MessageListenerContainerFactory() {}

    public MessageListenerContainer createMessageListenerContainer(String queueName) {
        SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory);
        mlc.addQueueNames(queueName);
        return mlc;
    }
}

4.4. Consumer Endpoint

4.4.消费者端点

Consumers will access the same endpoint address used by producers (/queue/{name}) to get messages.

消费者将访问生产者使用的相同的端点地址(/queue/{name})来获取消息。

This endpoint returns a Flux of events, where each event corresponds to a received message:

该端点返回一个Flux 事件,其中每个事件对应于一个收到的消息。

@Autowired
private MessageListenerContainerFactory messageListenerContainerFactory;

@GetMapping(
  value = "/queue/{name}",
  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {

    DestinationInfo d = destinationsConfig
      .getQueues()
      .get(name);
    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
          .build());
    }

    MessageListenerContainer mlc = messageListenerContainerFactory
      .createMessageListenerContainer(d.getRoutingKey());

    Flux<String> f = Flux.<String> create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
            emitter.next(payload);
        });
        emitter.onRequest(v -> {
            mlc.start();
        });
        emitter.onDispose(() -> {
            mlc.stop();
        });
      });

    return Flux.interval(Duration.ofSeconds(5))
      .map(v -> "No news is good news")
      .mergeWith(f);
}

After the initial check on the destination name, the consumer endpoint creates MessageListenerContainer using the MessageListenerContainerFactory and the queue name recovered from our registry.

在对目标名称进行初步检查后,消费者端点使用MessageListenerContainerFactory和从我们的注册表恢复的队列名称创建MessageListenerContainer

Once we have our MessageListenerContainer, we create the message Flux using one of its create() builder methods.

一旦我们有了MessageListenerContainer,我们就使用其create()构建器方法之一创建消息Flux

In our particular case, we use one that takes a lambda taking a FluxSink argument, which we then use to bridge Spring AMQP´s listener-based async API to our reactive application.

在我们的特定案例中,我们使用了一个接受FluxSink参数的lambda,然后我们用它来连接Spring AMQP基于监听器的异步API和我们的反应式应用程序。

We also attach two additional lambdas to the emitter´s onRequest() and onDispose() callbacks so our MessageListenerContainer can allocate/release its internal resources following the Flux´s lifecycle.

我们还在发射器的onRequest()onDispose()回调中附加了两个额外的lambdas,因此我们的MessageListenerContainer可以在Flux的生命周期中分配/释放其内部资源。

Finally, we merge the resulting Flux with another one created with interval(), which creates a new event every five seconds. Those dummy messages play an important function in our case: without them, we’d only detect a client disconnection upon receiving a message and failing to send it, which can take a long time depending on your particular use case.

最后,我们将产生的Flux与另一个用interval()创建的Flux合并,后者每五秒创建一个新事件。这些假消息在我们的案例中发挥了重要的作用:如果没有它们,我们只能在收到消息后检测到客户端断开连接,而无法发送消息,这可能需要很长的时间,这取决于你的特定用例。

4.5. Testing

4.5.测试

With both our consumer and publisher endpoints setup, we can now do some tests with our sample application.

有了消费者和发布者端点的设置,我们现在可以用我们的示例应用程序做一些测试。

We need to define RabbitMQ´s server connection details and at least one destination on our application.yml, which should look like this:

我们需要在我们的 application.yml 上定义 RabbitMQ 的服务器连接细节和至少一个目的地,它应该看起来像这样。

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    
destinations:
  queues:
    NYSE:
      exchange: nyse
      routing-key: NYSE

The spring.rabbitmq.* properties define the basic properties required to connect to our RabbitMQ server running in a local Docker container. Please note that the IP shown above is just an example an may be different in a particular setup.

spring.rabbitmq.* 属性定义了连接到运行在本地 Docker 容器中的 RabbitMQ 服务器所需的基本属性。请注意,上面显示的 IP 只是一个例子,在特定设置中可能有所不同。

Queues are defines using destinations.queues.<name>.*, where <name> is used as the destination name. Here we declared a single destination named “NYSE” that will send messages to the “nyse” exchange on RabbitMQ with an “NYSE” routing key.

队列是使用 destinations.queues.<name>.* 定义的,其中 <name> 被用作目标名称。这里我们声明了一个名为 “NYSE “的单一目的地,它将向 RabbitMQ 上的 “nyse “交易所发送消息,并使用 “NYSE “路由键。

Once we start the server via command line or from our IDE, we can start sending and receiving messages. We’ll use the curl utility, a common utility available for both Windows, Mac & Linux OSs.

一旦我们通过命令行或从我们的IDE启动服务器,我们就可以开始发送和接收信息。我们将使用curl工具,这是一个可用于Windows、Mac和Linux操作系统的通用工具。

The following listing shows how to send a message to our destination and the expected response from the server:

下面的列表显示了如何向我们的目的地发送一个消息,以及预期来自服务器的响应。

$ curl -v -d "Test message" http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 12
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 12 out of 12 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact

After executing this command we can verify that the message was received by RabbitMQ and is ready for consumption issuing the following command:

执行此命令后,我们可以验证消息是否被 RabbitMQ 接收并准备好用于消费,请发布以下命令。

$ docker exec rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    1

Now we can read messages with curl with the following command:

现在我们可以通过以下命令用curl读取信息。

$ curl -v http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:Test message

data:No news is good news...

... same message repeating every 5 secs

As we can see, first we get the previously stored message and then we start to receive our dummy message every 5 seconds.

正如我们所看到的,首先我们得到了先前存储的信息,然后我们开始每5秒钟收到我们的假信息。

If we run again the command to list queues we can now see that there are no messages stored:

如果我们再次运行列出队列的命令,我们现在可以看到没有存储任何信息。

$ docker exec rabbitmq rabbitmqctl list_queues

$ docker exec rabbitmq rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    0

5. Scenario 2: Publish-Subscribe

5.情景2:发布-订阅

Another common scenario for messaging applications is the Publish-Subscribe pattern, where a single message must be sent to multiple consumers.

消息传递应用的另一个常见场景是发布-订阅模式,在这种模式下,一个消息必须被发送给多个消费者。

RabbitMQ offers two types of exchanges that support those kinds of applications:  Fan-out and Topic.

RabbitMQ 提供了两种类型的交换,支持这些类型的应用程序。 扇出和主题。

The main difference between those two kinds is that the latter allows us to filter which messages to receive based on a routing key pattern (e.g. “alarm.mailserver.*”) supplied at registration time, whereas the former simply replicate incoming messages to all bound queues.

这两种的主要区别是,后者允许我们根据注册时提供的路由键模式(例如 “alarm.mailserver.*”)来过滤要接收的消息,而前者只是将传入的消息复制到所有绑定的队列中。

RabbitMQ also supports Header Exchanges, which allows for more complex message filtering, but its use is out of the scope of this article.

RabbitMQ 还支持 Header Exchanges,它允许更复杂的消息过滤,但其使用不在本文的范围内。

5.1. Destinations Setup

5.1.目的地设置

We define Pub/Sub destinations at startup time with another @PostConstruct method, as we did in the point-to-point scenario.

我们在启动时用另一个@PostConstruct 方法定义Pub/Sub目的地,就像我们在点对点方案中做的那样。

The only difference is that we only create the Exchanges, but no Queues –  those will be created on demand and bound to the Exchange later, as we want an exclusive Queue for each client:

唯一的区别是,我们只创建了Exchange,而没有创建Queue–这些将按需创建,并在以后与Exchange绑定,因为我们希望每个客户都有专属的Queue

@PostConstruct
public void setupTopicDestinations(
    destinationsConfig.getTopics()
      .forEach((key, destination) -> {
          Exchange ex = ExchangeBuilder
            .topicExchange(destination.getExchange())
            .durable(true)
            .build();
            amqpAdmin.declareExchange(ex);
      });
}

5.2. Publisher Endpoint

5.2.发布者端点

Clients will use the publisher endpoint available at the /topic/{name} location in order to post messages that will be sent to all connected clients.

客户端将使用/topic/{name}位置上的发布者端点,以便发布将被发送到所有连接的客户端的信息。

As in the previous scenario, we use a @PostMapping that returns a Mono with the status after sending the message:

和前面的情况一样,我们使用一个@PostMapping,在发送消息后返回一个带有状态的Mono

@PostMapping(value = "/topic/{name}")
public Mono<ResponseEntity<?>> sendMessageToTopic(
  @PathVariable String name, @RequestBody String payload) {

    DestinationInfo d = destinationsConfig
      .getTopics()
      .get(name);
    
    if (d == null) {
        return Mono.just(ResponseEntity.notFound().build());
    }      
    
   return Mono.fromCallable(() -> {
       amqpTemplate.convertAndSend(
         d.getExchange(), d.getRoutingKey(),payload);   
            return ResponseEntity.accepted().build();
        });
    }

5.3. Subscriber Endpoint

5.3.订阅者端点

Our subscriber endpoint will be located at /topic/{name}, producing a Flux of messages for connected clients.

我们的订阅者端点将位于/topic/{name},为连接的客户产生Flux消息。

Those messages include both the received messages and dummy messages generated every 5 seconds:

这些信息包括收到的信息和每5秒产生的假信息。

@GetMapping(
  value = "/topic/{name}",
  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
    DestinationInfo d = destinationsConfig.getTopics()
        .get(name);
    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
            .build());
    }
    Queue topicQueue = createTopicQueue(d);
    String qname = topicQueue.getName();
    MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname);
    Flux<String> f = Flux.<String> create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
            emitter.next(payload);
        });
        emitter.onRequest(v -> {
            mlc.start();
        });
        emitter.onDispose(() -> {
            amqpAdmin.deleteQueue(qname);
            mlc.stop();
        });            
      });
    
    return Flux.interval(Duration.ofSeconds(5))
        .map(v -> "No news is good news")
        .mergeWith(f);
}

This code is basically the same as we’ve seen in the previous case, with only the following differences: first, we create a new Queue for every new subscriber.

这段代码与我们在前面的案例中看到的基本相同,只有以下区别:首先,我们为每个新的订阅者创建一个新的Queue

We do that by a call to the createTopicQueue() method, which uses information from the DestinationInfo instance to create an exclusive, non-durable queue, that we then bind to the Exchange using the configured routing key:

我们通过调用createTopicQueue()方法来做到这一点,该方法使用来自DestinationInfo实例的信息来创建一个独占的、非持久的队列,然后我们使用配置的路由密钥将其绑定到Exchange

private Queue createTopicQueue(DestinationInfo destination) {

    Exchange ex = ExchangeBuilder
      .topicExchange(destination.getExchange())
      .durable(true)
      .build();
    amqpAdmin.declareExchange(ex);
    Queue q = QueueBuilder
      .nonDurable()
      .build();     
    amqpAdmin.declareQueue(q);
    Binding b = BindingBuilder.bind(q)
      .to(ex)
      .with(destination.getRoutingKey())
      .noargs();        
    amqpAdmin.declareBinding(b);
    return q;
}

Note that, despite the fact we declare the Exchange again, RabbitMQ won’t create a new one, since we have already declared it at startup time.

请注意,尽管我们再次声明了 Exchange ,但 RabbitMQ 不会创建一个新的,因为我们已经在启动时声明了它。

The second difference is in the lambda that we pass to the onDispose() method, which this time will also delete the Queue when the subscriber disconnects.

第二个区别是我们传递给onDispose()方法的lambda,这次当订阅者断开连接时,它也将删除Queue

5.3. Testing

5.3.测试

In order to test the Pub-Sub scenario we must first define a topic destination in out application.yml like this:

为了测试Pub-Sub方案,我们必须首先在外面的application.yml中定义一个主题目的地,像这样。

destinations:
## ... queue destinations omitted      
  topics:
    weather:
      exchange: alerts
      routing-key: WEATHER

Here, we’ve defined a topic endpoint that will be available at the /topic/weather location. This endpoint will be used to post messages to the “alerts” exchange on RabbitMQ with a “WEATHER” routing key.

在这里,我们定义了一个主题端点,它将在/topic/weather位置可用。这个端点将被用于向 RabbitMQ 上的 “alerts “交换器发布消息,并带有 “WEATHER “路由键。

After starting out server we can verify that the exchange has been created using the rabbitmqctl command:

在启动服务器后,我们可以使用rabbitmqctl命令来验证交换是否已经创建。

$ docker exec docker_rabbitmq_1 rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.topic       topic
amq.fanout      fanout
amq.match       headers
amq.headers     headers
        direct
amq.rabbitmq.trace      topic
amq.direct      direct
alerts  topic

Now, if we issue the list_bindings command, we can see that there are no queues related to the “alerts” exchange:

现在,如果我们发出list_bindings命令,我们可以看到没有与 “alerts “交换有关的队列。

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        NYSE    queue   NYSE    []
nyse    exchange        NYSE    queue   NYSE    []

Let´s start a couple of subscribers that will subscribe to our destination, by opening two command shells and issuing the following command on each one:

让我们启动几个订阅者,他们将订阅我们的目的地,方法是打开两个命令外壳,在每个外壳上发出以下命令。

$ curl -v http://localhost:8080/topic/weather
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:No news is good news...

# ... same message repeating indefinitely

Finally, we use curl once again to send some alerts to our subscribers:

最后,我们再一次使用curl向我们的订阅者发送一些警报。

$ curl -v -d "Hurricane approaching!" http://localhost:8080/topic/weather
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 22
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 22 out of 22 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact

Once we send the message, we can almost instantly see the message “Hurricane approaching !” on each subscriber´s shell.

一旦我们发送消息,我们几乎可以立即在每个用户的外壳上看到 “飓风即将来临!”的消息。

If we check now the available bindings, we can see that we have one queue for each subscriber:

如果我们现在检查可用的绑定,我们可以看到,我们为每个订阅者有一个队列。

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        IBOV    queue   IBOV    []
        exchange        NYSE    queue   NYSE    []
        exchange        spring.gen-i0m0pbyKQMqpz2_KFZCd0g       
  queue   spring.gen-i0m0pbyKQMqpz2_KFZCd0g       []
        exchange        spring.gen-wCHALTsIS1q11PQbARJ7eQ       
  queue   spring.gen-wCHALTsIS1q11PQbARJ7eQ       []
alerts  exchange        spring.gen-i0m0pbyKQMqpz2_KFZCd0g     
  queue   WEATHER []
alerts  exchange        spring.gen-wCHALTsIS1q11PQbARJ7eQ     
  queue   WEATHER []
ibov    exchange        IBOV    queue   IBOV    []
nyse    exchange        NYSE    queue   NYSE    []
quotes  exchange        NYSE    queue   NYSE    []

Once we hit Ctrl-C on the subscriber´s shell, our gateway will eventually detect that the client has disconnected and will remove those bindings.

一旦我们在用户的外壳上按下Ctrl-C,我们的网关将最终检测到客户端已经断开连接,并将删除这些绑定。

6. Conclusion

6.结语

In this article, we’ve demonstrated how to create a simple reactive application that interacts with a RabbitMQ server using the spring-amqp module.

在本文中,我们演示了如何使用 spring-amqp模块创建一个与 RabbitMQ 服务器交互的简单反应式应用程序。

With just a few lines of code, we were able to create a functional HTTP-to-AMQP gateway that supports both Point-to-Point and Publish-Subscribe integration patterns, which we can easily extend to add additional features such as security by the addition of standard Spring features.

仅仅用了几行代码,我们就能够创建一个功能性的HTTP-to-AMQP网关,它支持点对点和发布-订阅集成模式,我们可以通过增加标准的Spring功能,轻松地扩展增加额外的功能,如安全性。

The code shown in this article is available over on Github.

本文所示的代码可在Github上获得