1. Introduction
1.绪论
In this quick tutorial, we’ll show how to use RabbitMQ’s APIs related to two core concepts: Connections and Channels.
在这个快速教程中,我们将展示如何使用RabbitMQ的与两个核心概念相关的API。连接和通道。
2. RabbitMQ Quick Recap
2.RabbitMQ快速回顾
RabbitMQ is a popular implementation of the AMQP (Advanced Messaging Queue Protocol), widely used by companies of all sizes to handle their messaging needs.
RabbitMQ 是 AMQP(高级消息队列协议)的流行实现,被各种规模的公司广泛用于处理其消息传递需求。
From an application point of view, we’re usually concerned with AMQP’s main entities: Virtual Hosts, Exchanges, and Queues. As we’ve already covered those concepts in earlier articles, here, we’ll focus on the details of two less-discussed concepts: Connections and Channels.
从应用的角度来看,我们通常关注的是AMQP的主要实体。虚拟主机、交换所和队列。由于我们已经在之前的文章中涵盖了这些概念,在这里,我们将关注两个较少讨论的概念的细节。连接和通道。
3. Connections
3.连接
The first step a client must take to interact with a RabbitMQ broker is to establish a Connection. AMPQ is an application-level protocol, so this connection happens on top of a transport-level one. This can be a regular TCP connection or an encrypted one using TLS. The main role of a Connection is to provide a secure conduit through which a client can interact with a broker.
客户端与 RabbitMQ 代理交互的第一步是建立一个连接。 AMPQ 是一个应用级协议,因此该连接发生在一个传输级协议之上。这可以是一个普通的 TCP 连接,也可以是一个使用 TLS 的加密连接。连接的主要作用是提供一个安全的管道,客户可以通过这个管道与代理进行交互。
This means that during connection establishment, a client must supply valid credentials to the server. A server may support different credential types, including regular username/password, SASL, X.509 password, or any supported mechanism.
这意味着在连接建立期间,客户端必须向服务器提供有效的凭证。服务器可以支持不同的凭证类型,包括普通的用户名/密码、SASL、X.509密码或任何支持的机制。
Besides security, the connection establishment phase is also responsible for negotiating some aspects of the AMPQ protocol. At this point, if the client and/or server cannot agree on the protocol version or a tuning parameter value, the connection won’t be established, and the transport level connection will be closed.
除了安全性,连接建立阶段还负责协商AMPQ协议的某些方面。在这一点上,如果客户和/或服务器不能就协议版本或调整参数值达成一致,连接就不会建立,传输级连接就会关闭。
3.1. Creating Connections in Java Applications
3.1.在Java应用程序中创建连接
When using Java, the standard way to communicate with a RabbitMQ browser is to use the amqp-client Java library. We can add this library to our project using adding the corresponding Maven dependency:
使用 Java 时,与 RabbitMQ 浏览器通信的标准方式是使用 amqp-client Java 库。我们可以通过添加相应的 Maven 依赖关系将该库添加到我们的项目中。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
The latest version of this artifact is available on Maven Central.
该工件的最新版本可在Maven Central上找到。
This library uses the Factory pattern to create new connections. First, we create a new ConnectionFactory instance and set all parameters needed to create connections. At a minimum, this requires informing the address of the RabbitMQ host:
这个库使用Factory模式来创建新的连接。首先,我们创建一个新的ConnectionFactory实例并设置创建连接所需的所有参数。至少,这需要告知 RabbitMQ 主机的地址。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("amqp.example.com");
Once we’re done setting those parameters, we use the newConnection() factory method to create a new Connection instance:
一旦我们完成了这些参数的设置,我们就使用newConnection()工厂方法来创建一个新的Connection实例。
Connection conn = factory.newConnection();
4. Channels
4.渠道
Simply put, an AMQP channel is a mechanism that allows multiplexing multiple logic flows on top of a single connection. This allows better resource usage both on the client and server-side since setting up a connection is a relatively expensive operation.
简单地说,AMQP通道是一种机制,它允许在单个连接之上复用多个逻辑流。这允许在客户端和服务器端更好地使用资源,因为建立一个连接是一个相对昂贵的操作。
A client creates one or more channels so it can send commands to the broker. This includes commands related to sending and/or receiving messages.
客户端创建一个或多个通道,以便它可以向经纪人发送命令。这包括与发送和/或接收消息有关的命令。
Channels also provide some additional guarantees regarding the protocol logic:
通道还提供了一些关于协议逻辑的额外保证。
- Commands for a given channel are always executed in the same order they’re sent.
- Given a scenario where a client opens multiple channels over a single connection, implementations can distribute the available bandwidth between them
- Both parties can issue flow control commands, which inform the peer that it should stop sending messages.
A key aspect of a channel is that its lifecycle is bound to the connection used to create it. This means that if we close a connection, all associated channels will also be closed.
通道的一个关键方面是,它的生命周期与用于创建它的连接相联系。这意味着,如果我们关闭一个连接,所有相关的通道也将被关闭。
4.1. Creating Channels in Java Applications
4.1.在Java应用程序中创建通道
Java applications using the amqp-client library create a new Channel from an existing Connection using the createChannel() method from the former:
使用amqp-client库的Java应用程序使用前者的createChannel()方法从现有的Connection创建一个新的Channel。
channel = conn.createChannel();
Once we have a Channel, we can send commands to the server. For instance, to create a queue, we use the queueDeclare() method:
一旦我们有了一个通道,我们就可以向服务器发送命令。例如,为了创建一个队列,我们使用queueDeclare()方法。
channel.queueDeclare("example.queue", true, false, true, null);
This code “declares” a queue, which is AMQP’s way of saying “create if not already existing”. The additional arguments after the queue name define its additional characteristics:
这段代码 “声明 “了一个队列,这是AMQP表示 “如果不存在就创建 “的方式。队列名称后面的额外参数定义了它的额外特性。
- durable: this declaration is persistent, meaning it will survive a server’s restart
- exclusive: this queue is restricted to the connection associated with the channel declaring it
- autodelete: the server will delete the queue once no longer in use
- args: optional map with arguments used to tune the queue behavior; for instance, we can use those arguments to define the TTL for messages and dead-letter behavior
Now, to post a message to this queue using the default exchange, we use the basicPublish() method:
现在,为了使用默认的交换方式向这个队列发布消息,我们使用basicPublish()方法。
channel.basicPublish("", queue, null, payload);
This code sends a message to the default exchange using the queue name as its routing key.
这段代码使用队列名称作为其路由密钥,向默认的交换所发送一条消息。
5. Channel Allocation Strategies
5.信道分配策略
Let’s consider a scenario where we use messaging systems: CQRS (Command Query Responsibility Segregation) applications. In a nutshell, CQRS-based applications have two independent paths: commands and queries. Commands can change data but never return values. Queries, on the other hand, return values but never modify them.
让我们考虑一个使用消息传递系统的场景。CQRS(命令查询责任隔离)应用程序。简而言之,基于CQRS的应用程序有两个独立的路径:命令和查询。命令可以改变数据,但从不返回值。另一方面,查询可以返回数值,但从不修改数值。
Since the command path never returns any data, the service can execute them asynchronously. In a typical implementation, we have an HTTP POST endpoint that internally builds a message and sends it to a queue for later processing.
由于命令路径从不返回任何数据,服务可以异步地执行它们。在一个典型的实现中,我们有一个HTTP POST端点,它在内部建立了一个消息,并将其发送到一个队列中供以后处理。
Now, for a service that must handle dozens or even hundreds of concurrent requests, opening connections and channels every time is not a realistic option. Instead, a better approach is to use a channel pool.
现在,对于一个必须处理几十个甚至几百个并发请求的服务来说,每次都打开连接和通道并不是一个现实的选择。相反,一个更好的方法是使用一个通道池。
Of course, this leads to the next problem: should we create a single connection and create channels from it or use multiple connections?
当然,这导致了下一个问题:我们应该创建一个单一的连接并从中创建通道还是使用多个连接?
5.1. Single Connection/Multiple Channels
5.1.单一连接/多个通道
In this strategy, we’ll use a single connection and just create a channel pool with a capacity equal to the maximum number of concurrent connections the service can manage. For a traditional thread-per-request model, this should be set to the same size as the request handler thread pool.
在这个策略中,我们将使用一个单一的连接,只是创建一个通道池,其容量等于服务可以管理的最大并发连接数。对于传统的逐个请求的线程模式,这应该被设置为与请求处理程序线程池的大小相同。
The downside of this strategy is that, under heavier loads, the fact that we must send commands one at a time through the associated channel implies that we must use a synchronization mechanism. This, in turn, adds extra latency in the command path, which we want to minimize.
这种策略的缺点是,在较重的负载下,我们必须通过相关通道一次发送一个命令,这意味着我们必须使用同步机制。这反过来又在命令路径中增加了额外的延迟,而这正是我们想要最小化的。
5.2. Connection-per-Thread Strategy
5.2.每线程连接策略
Another option is to go to the other extreme and use a Connection pool, so there’s never contention for a channel. For each Connection, we’ll create a single Channel that a handler thread will use to issue commands to the server.
另一个选择是走向另一个极端,使用一个Connection池,这样就不会出现争夺通道的情况。对于每个Connection,我们将创建一个Channel,处理程序线程将使用它来向服务器发出命令。
However, the fact that we remove synchronization from the client side comes with a cost. The broker must allocate additional resources for each connection, such as socket descriptors and state information. Moreover, the server must split the available throughput between clients.
然而,我们从客户端移除同步的事实是有代价的。代理商必须为每个连接分配额外的资源,如套接字描述符和状态信息。此外,服务器必须在客户端之间分配可用的吞吐量。
6. Benchmarking Strategies
6.标杆战略
To evaluate those candidate strategies, let’s run a simple benchmark for each one. The benchmark consists of running multiple workers in parallel that send one thousand messages of 4 Kbytes each. Upon construction, the worker receives a Connection from which it will create a Channel to send commands. It also receives the number of iterations, payload size, and a CountDownLatch used to inform the test runner that it has finished sending messages:
为了评估这些候选策略,让我们为每个策略运行一个简单的基准。该基准包括并行运行多个工作器,每个工作器发送一千条4KB的消息。在构建时,工作器收到一个Connection,它将从中创建一个Channel以发送命令。它还会收到迭代次数、有效载荷大小,以及用于通知测试运行器它已完成消息发送的CountDownLatch。
public class Worker implements Callable<Worker.WorkerResult> {
// ... field and constructor omitted
@Override
public WorkerResult call() throws Exception {
try {
long start = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
channel.basicPublish("", queue, null, payload);
}
long elapsed = System.currentTimeMillis() - start;
channel.queueDelete(queue);
return new WorkerResult(elapsed);
} finally {
counter.countDown();
}
}
public static class WorkerResult {
public final long elapsed;
WorkerResult(long elapsed) {
this.elapsed = elapsed;
}
}
}
Besides indicating that it has finished its job by decrementing the latch, the worker also returns a WorkerResult instance with the elapsed time to send all messages. Although here we just have a long value, we can use extend it to return more details.
除了通过递减锁存器来表明它已经完成了工作,该工作者还返回一个WorkerResult实例,其中包含发送所有消息所花费的时间。虽然这里我们只有一个long值,但我们可以使用扩展它来返回更多细节。
The controller creates the connection factory and workers according to the strategy being evaluated. For the single connection, it creates the Connection instance and passes it to every worker:
控制器根据正在评估的策略来创建连接工厂和工作者。对于单个连接,它创建Connection实例并将其传递给每个工作者。
@Override
public Long call() {
try {
Connection connection = factory.newConnection();
CountDownLatch counter = new CountDownLatch(workerCount);
List<Worker> workers = new ArrayList<>();
for( int i = 0 ; i < workerCount ; i++ ) {
workers.add(new Worker("queue_" + i, connection, iterations, counter,payloadSize));
}
ExecutorService executor = new ThreadPoolExecutor(workerCount, workerCount, 0,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(workerCount, true));
long start = System.currentTimeMillis();
executor.invokeAll(workers);
if( counter.await(5, TimeUnit.MINUTES)) {
long elapsed = System.currentTimeMillis() - start;
return throughput(workerCount,iterations,elapsed);
}
else {
throw new RuntimeException("Timeout waiting workers to complete");
}
}
catch(Exception ex) {
throw new RuntimeException(ex);
}
}
For the multiple connections strategy, we create a new Connection for each worker:
对于多连接策略,我们为每个工作者创建一个新的Connection。
for (int i = 0; i < workerCount; i++) {
Connection conn = factory.newConnection();
workers.add(new Worker("queue_" + i, conn, iterations, counter, payloadSize));
}
The throughput function calculates the benchmark measure will be the total time needed to complete all workers, divided by the number of workers:
吞吐量函数计算的基准衡量标准将是完成所有工人所需的总时间,除以工人的数量。
private static long throughput(int workerCount, int iterations, long elapsed) {
return (iterations * workerCount * 1000) / elapsed;
}
Notice that we need to multiply the numerator by 1000 so we get the throughput in messages by second.
请注意,我们需要将分子乘以1000,这样我们就可以得到每秒的信息吞吐量。
7. Running the Benchmark
7.运行基准测试
These are the results of our benchmark for both strategies. For each worker count, we’ve run the benchmark 10 times and used the average value as the throughput measure for tar particular worker/strategy. The environment itself is modest by today’s standards:
这些是我们对两种策略的基准测试的结果。对于每个工人的数量,我们已经运行了10次基准,并使用平均值作为特定工人/策略的吞吐量指标。以今天的标准来看,环境本身是适度的。
- CPU: dual-core i7 dell notebook @ 3.0 GHz
- Total RAM: 16 GB
- RabbitMQ: 3.10.7 running on Docker (docker-machine with 4 GBytes RAM)
For this specific environment, we see a slight advantage for the single connection strategy. This advantage seems to increase for the 150 workers scenario.
在这个特定的环境中,我们看到单一连接策略有轻微的优势。在150名工人的情况下,这种优势似乎会增加。
8. Selecting a Strategy
8.选择一个战略
Given the benchmark results, we cannot point to a clear winner. For worker counts between 5 and 100, the results are more or less the same. After that, the overhead associated with multiple connections seems to be higher than handling multiple channels on a single connection.
鉴于基准结果,我们不能指出一个明显的赢家。对于工人数在5到100之间的情况,结果大致相同。在这之后,与多个连接相关的开销似乎比在单个连接上处理多个通道要高。
Also, we must consider that the test workers do only one thing: send fixed messages to a queue. Real-world applications, like the CQRS one we’ve mentioned, usually do some extra work before and/or after sending a message. So, to select the best strategy, the recommended way is to run your own benchmark using a configuration that is as close as possible to the production environment.
另外,我们必须考虑到,测试工作者只做一件事:向队列发送固定的消息。真实世界的应用,比如我们提到的CQRS,通常会在发送消息之前和/或之后做一些额外的工作。因此,为了选择最佳策略,推荐的方法是使用尽可能接近生产环境的配置来运行你自己的基准。
9. Conclusion
9.结语
In this article, we’ve explored the concepts of Channels and Connections in RabbitMQ and how we can use them in different ways. As usual, the full code is available over on GitHub.
在这篇文章中,我们探讨了 RabbitMQ 中通道和连接的概念,以及我们如何以不同方式使用它们。像往常一样,完整的代码可在 GitHub 上获取。