Messaging with Spring AMQP – 用Spring AMQP进行信息传递

最后修改: 2017年 1月 16日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll explore message-based communication over AMQP using the Spring AMQP framework. First, we’ll cover some of the key concepts of messaging. Then, we’ll move on to a practical example.

在本教程中,我们将使用Spring AMQP框架探索基于AMQP的消息通信。首先,我们将介绍消息传递的一些关键概念。然后,我们将转到一个实际的例子。

2. Message-Based Communication

2.基于信息的沟通

Messaging is a technique for communicating between applications. It relies on asynchronous message-passing instead of synchronous request response-based architecture. Producers and consumers of messages are decoupled by an intermediate messaging layer known as a message broker. A message broker provides features like persistent storage of messages, message filtering, and message transformation.

消息传递是一种在应用程序之间进行通信的技术。它依赖于异步消息传递而不是基于请求响应的同步架构。消息的生产者和消费者被称为消息代理的中间消息层所解耦。消息代理提供的功能包括消息的持久性存储、消息过滤和消息转换。

In a case of messaging between applications written in Java, the JMS (Java Message Service) API is commonly used. For interoperability between different vendors and platforms, we won’t be able to use JMS clients and brokers. This is where AMQP comes in handy.

在用Java编写的应用程序之间进行消息传递的情况下,通常使用JMS(Java消息服务)API。为了实现不同供应商和平台之间的互操作性,我们将无法使用JMS客户端和代理。这就是AMQP的用武之地

3. AMQP – Advanced Message Queuing Protocol

3.AMQP – 高级消息队列协议

AMQP is an open standard wire specification for asynchronous message communication. It provides a description of how a message should be constructed.

AMQP是一个用于异步消息通信的开放标准电线规范。它提供了一个关于消息应该如何构建的描述。

3.1. How Amqp Is Different from Jms

3.1.Amqp与Jms有什么不同

Since AMQP is a platform-neutral binary protocol standard, libraries can be written in different programming languages, and run on different environments.

由于AMQP是一个平台中立的二进制协议标准,库可以用不同的编程语言编写,并在不同的环境中运行。

There is no vendor based protocol lock-in, as is the case when migrating from one JMS broker to another. For more details refer to JMS vs AMQP and Understanding AMQP. Some of the widely used AMQP brokers are RabbitMQ, OpenAMQ, and StormMQ.

不存在基于供应商的协议锁定,就像从一个JMS代理迁移到另一个JMS代理时那样。有关详细信息,请参阅JMS 与 AMQP了解 AMQP。一些广泛使用的AMQP代理是RabbitMQOpenAMQ和StormMQ。

3.2. AMQP Entities

3.2.AMQP实体

Briefly, AMQP is made up of Exchanges, Queues, and Bindings:

简而言之,AMQP是由交换、队列和绑定组成的。

  • Exchanges are like post offices or mailboxes and clients publish a message to an AMQP exchange. There are four built-in exchange types
    • Direct Exchange – Routes messages to a queue by matching a complete routing key
    • Fanout Exchange – Routes messages to all the queues bound to it
    • Topic Exchange – Routes messages to multiple queues by matching a routing key to a pattern
    • Headers Exchange – Routes messages based on message headers
  • Queues are bound to an exchange using a routing key
  • Messages are sent to an exchange with a routing key. The exchange then distributes copies of messages to queues

For more details, have a look at AMQP Concepts and Routing Topologies.

更多细节,请看AMQP概念路由拓扑结构

3.3. Spring AMQP

3.3.Spring AMQP

Spring AMQP comprises two modules: spring-amqp and spring-rabbit. Together, these modules provide abstractions for:

Spring AMQP包括两个模块。spring-amqpspring-rabbit。这些模块一起为以下内容提供了抽象。

  • AMQP entities – we create entities with the Message, Queue, Binding, and Exchange classes
  • Connection Management – we connect to our RabbitMQ broker by using a CachingConnectionFactory
  • Message Publishing – we use a RabbitTemplate to send messages
  • Message Consumption – we use a @RabbitListener to read messages from a queue

4. Setup a Rabbitmq Broker

4.设置一个Rabbitmq Broker

We need a RabbitMQ broker available for us to connect to. The simplest way to do this is by using Docker to fetch and run a RabbitMQ image for us:

我们需要一个可供我们连接的 RabbitMQ 代理。做到这一点的最简单方法是使用 Docker 为我们获取并运行 RabbitMQ 镜像。

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management

We expose port 5672 so that our application can connect to RabbitMQ.

我们公开了端口 5672,以便我们的应用程序可以连接到 RabbitMQ。

And, we expose port 15672 so that we can see what our RabbitMQ broker is doing via either the management UI: http://localhost:15672 or the HTTP API: http://localhost:15672/api/index.html.

而且,我们公开了 15672 端口,这样我们就可以通过管理用户界面看到我们的 RabbitMQ 代理正在做什么。http://localhost:15672 或 HTTP API。http://localhost:15672/api/index.html

5. Creating Our Spring Amqp Application

5.创建我们的Spring Amqp应用程序

So, now let’s create our application to send and receive a simple “Hello, world!” message by using Spring AMQP.

所以,现在让我们创建我们的应用程序,通过使用Spring AMQP发送和接收一个简单的 “Hello, world!”消息。

5.1. Maven Dependencies

5.1.Maven的依赖性

In order to add the spring-amqp and spring-rabbit modules to our project, we add the spring-boot-starter-amqp dependency to our pom.xml:

为了将spring-amqpspring-rabbit模块添加到我们的项目中,我们将spring-boot-starter-amqp依赖性添加到pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.2.2.RELEASE</version>
    </dependency>
</dependencies>

We can find the latest version in at Maven Central.

我们可以在Maven中心找到最新的版本。

5.2. Connecting to Our Rabbitmq Broker

5.2.连接到我们的 Rabbitmq Broker

We’ll use Spring Boot’s auto-configuration to create our ConnectionFactory, RabbitTemplate, and RabbitAdmin beans. As a result, we get a connection to our RabbitMQ broker on port 5672 using the default username and password of “guest”. So, we just annotate our application with @SpringBootApplication:

我们将使用 Spring Boot 的自动配置来创建我们的 ConnectionFactoryRabbitTemplateRabbitAdmin Bean。结果,我们得到了一个连接到我们的 RabbitMQ 代理的端口 5672,使用的是默认的用户名和密码 “guest”。因此,我们只需用@SpringBootApplication来注释我们的应用程序。

@SpringBootApplication
public class HelloWorldMessageApp {
   // ...
}

5.3. Create Our Queue

5.3.创建我们的队列

In order to Create Our Queue, we simply define a bean of type Queue. RabbitAdmin will find this and bind it to the default exchange with a routing key of “myQueue”:

为了创建我们的队列,我们只需定义一个Queue类型的bean。RabbitAdmin将找到这个bean,并将其绑定到默认的交换中,路由键为 “myQueue”。

@Bean
public Queue myQueue() {
    return new Queue("myQueue", false);
}

We set the queue to be non-durable so that the queue and any messages on it will be removed when RabbitMQ is stopped. Note, however, that restarting our application will have no effect on the queue.

我们将队列设置为非持久性,以便当 RabbitMQ 停止时,队列和其上的任何消息将被删除。但是,请注意,重新启动我们的应用程序将不会对队列产生影响。

5.4. Send Our Message

5.4 发送我们的信息

Let’s use the RabbitTemplate to send our “Hello, world!” message:

让我们使用RabbitTemplate来发送我们的 “你好,世界!”消息。

rabbitTemplate.convertAndSend("myQueue", "Hello, world!");

5.5. Consume Our Message

5.5 消耗我们的信息

We’ll implement a message consumer by annotating a method with @RabbitListener:

我们将通过用@RabbitListener注释一个方法来实现一个消息消费者。

@RabbitListener(queues = "myQueue")
public void listen(String in) {
    System.out.println("Message read from myQueue : " + in);
}

6. Running Our Application

6.运行我们的应用程序

First, we start the RabbitMQ broker:

首先,我们启动 RabbitMQ 代理。

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management

Then, we run the spring boot application by running HelloWorldMessage.java, executing the main() method:

然后,我们通过运行HelloWorldMessage.java,执行main()方法运行spring boot应用程序。

mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.simple.HelloWorldMessageApp

While the application is running we will see that:

当应用程序正在运行时,我们将看到。

  • The application sends a message to the default exchange with “myQueue” as the routing key
  • Then, the queue “myQueue” receives the message
  • Finally, the listen method consumes the message from “myQueue” and prints it on the console

We can also use the RabbitMQ management page at http://localhost:15672 to see that our message has been sent and consumed.

我们还可以使用 RabbitMQ 管理页面 http://localhost:15672 来查看我们的消息是否已被发送和消费。

7. Conclusion

7.结论

In this tutorial, we covered messaging-based architecture over AMQP protocol using Spring AMQP for communication between applications.

在本教程中,我们介绍了基于AMQP协议的消息传递架构,使用Spring AMQP进行应用程序之间的通信。

The complete source code and all code snippets for this tutorial are available on the GitHub project.

本教程的完整源代码和所有代码片段都可以在GitHub项目上找到。