1. Overview
1.概述
In this tutorial, we’ll use the Java Client for NATs to connect to a NATS Server and publish and receive messages.
在本教程中,我们将使用Java Client for NATs连接到NATS服务器并发布和接收消息。
NATS offers three primary modes of message exchange. Publish/Subscribe semantics delivers messages to all subscribers of a topic. Request/Reply messaging sends requests via topics and routes responses back to the requestor.
NATS提供三种主要的消息交换模式。发布/订阅语义将消息传递给一个主题的所有订阅者。请求/回复消息通过主题发送请求,并将响应返回给请求者。
Subscribers can also join message queue groups when they subscribe to a topic. Messages sent to the associated topic are only delivered to one subscriber in the queue group.
订阅者在订阅一个主题时也可以加入消息队列组。发送到相关主题的消息只传递给队列组中的一个订阅者。
2. Setup
2.设置
2.1. Maven Dependency
2.1.Maven的依赖性
First, we need to add the NATS library to our pom.xml:
首先,我们需要将NATS库添加到我们的pom.xml:中。
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>1.0</version>
</dependency>
The latest version of the library can be found here, and the Github project is here.
2.2. NATS Server
2.2.NATS服务器
Second, we’ll need a NATS Server for exchanging messages. There’re instructions for all major platforms here.
第二,我们将需要一个NATS服务器来交换信息。这里有所有主要平台的说明。
We assume that there’s a server running on localhost:4222.
我们假设有一个服务器在localhost:4222上运行。
3. Connect and Exchange Messages
3.连接和交换信息
3.1. Connect to NATS
3.1.连接到NATS
The connect() method in the static NATS class creates Connections.
静态NATS类中的connect()方法创建了Connections。
If we want to use a connection with default options and listening at localhost on port 4222, we can use the default method:
如果我们想使用一个带有默认选项的连接,并在4222端口的localhost监听,我们可以使用默认方法。
Connection natsConnection = Nats.connect();
But Connections have many configurable options, a few of which we want to override.
但连接有许多可配置的选项,其中有几个是我们想要覆盖的。
We’ll create an Options object and pass it to Nats:
我们将创建一个Options对象并将其传递给Nats。
private Connection initConnection() {
Options options = new Options.Builder()
.errorCb(ex -> log.error("Connection Exception: ", ex))
.disconnectedCb(event -> log.error("Channel disconnected: {}", event.getConnection()))
.reconnectedCb(event -> log.error("Reconnected to server: {}", event.getConnection()))
.build();
return Nats.connect(uri, options);
}
NATS Connections are durable. The API will attempt to reconnect a lost connection.
NATS的连接是持久的。API将尝试重新连接一个失去的连接。
We’ve installed callbacks to notify us of when a disconnect occurs and when the connection is restored. In this example, we’re using lambdas, but for applications that need to do more than simply log the event, we can install objects that implement the required interfaces.
我们已经安装了回调,以便在断开连接发生时和恢复连接时通知我们。在这个例子中,我们使用的是lambdas,但对于需要做更多事情而不是简单地记录事件的应用程序,我们可以安装实现所需接口的对象。
We can run a quick test. Create a connection and add a sleep for 60 seconds to keep the process running:
我们可以运行一个快速测试。创建一个连接,并添加一个60秒的睡眠,以保持该进程的运行。
Connection natsConnection = initConnection();
Thread.sleep(60000);
Run this. Then stop and start your NATS server:
运行这个。然后停止并启动你的NATS服务器。
[jnats-callbacks] ERROR com.baeldung.nats.NatsClient
- Channel disconnected: io.nats.client.ConnectionImpl@79428dc1
[reconnect] WARN io.nats.client.ConnectionImpl
- couldn't connect to nats://localhost:4222 (nats: connection read error)
[jnats-callbacks] ERROR com.baeldung.nats.NatsClient
- Reconnected to server: io.nats.client.ConnectionImpl@79428dc1
We can see the callbacks log the disconnection and reconnect.
我们可以看到回调记录了断开连接和重新连接的情况。
3.2. Subscribe to Messages
3.2.订阅信息
Now that we have a connection, we can work on message processing.
现在我们有了一个连接,我们可以进行消息处理。
A NATS Message is a container for an array of bytes[]. In addition to the expected setData(byte[]) and byte[] getData() methods there’re methods for setting and getting the message destination and reply to topics.
NATS的Message是一个bytes[]数组的容器。除了预期的setData(byte[])和byte[] getData()方法之外,还有设置和获取消息目的地和回复主题的方法。
We subscribe to topics, which are Strings.
我们订阅了主题,也就是字符串。
NATS supports both synchronous and asynchronous subscriptions.
NATS同时支持同步和异步订阅。
Let’s take a look at an asynchronous subscription:
让我们来看看异步订阅的情况。
AsyncSubscription subscription = natsConnection
.subscribe( topic, msg -> log.info("Received message on {}", msg.getSubject()));
The API delivers Messages to our MessageHandler(), in its thread.
API将Messages送到我们的MessageHandler(),在其线程中。
Some applications may want to control the thread that processes messages instead:
有些应用程序可能希望控制处理消息的线程,而不是控制处理消息的线程。
SyncSubscription subscription = natsConnection.subscribeSync("foo.bar");
Message message = subscription.nextMessage(1000);
SyncSubscription has a blocking nextMessage() method that will block for the specified number of milliseconds. We’ll use synchronous subscriptions for our tests to keep the test cases simple.
SyncSubscription有一个阻塞的nextMessage()方法,将阻塞指定的毫秒数。我们将在测试中使用同步订阅,以保持测试案例的简单。
AsyncSubscription and SyncSubscription both have an unsubscribe() method that we can use to close the subscription.
AsyncSubscription和SyncSubscription都有一个unsubscribe()方法,我们可以用它来关闭订阅。
subscription.unsubscribe();
3.3. Publish Messages
3.3.发布消息
Publishing Messages can be done several ways.
发布Messages可以通过几种方式进行。
The simplest method requires only a topic String and the message bytes:
最简单的方法只需要一个主题String和信息bytes。
natsConnection.publish("foo.bar", "Hi there!".getBytes());
If a publisher wishes a response or to provide specific information about the source of a message, it’s may also send a message with a reply-to topic:
如果发布者希望得到回应或提供关于信息来源的具体信息,它也可以发送一个带有回复主题的信息。
natsConnection.publish("foo.bar", "bar.foo", "Hi there!".getBytes());
There are also overloads for a few other combinations such as passing in a Message instead of bytes.
还有一些其他组合的重载,比如传入一个Message而不是bytes。
3.4. A Simple Message Exchange
3.4.一个简单的消息交换
Given a valid Connection, we can write a test that verifies message exchange:
给定一个有效的Connection,我们可以写一个测试来验证消息交换。
SyncSubscription fooSubscription = natsConnection.subscribe("foo.bar");
SyncSubscription barSubscription = natsConnection.subscribe("bar.foo");
natsConnection.publish("foo.bar", "bar.foo", "hello there".getBytes());
Message message = fooSubscription.nextMessage();
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));
natsConnection
.publish(message.getReplyTo(), message.getSubject(), "hello back".getBytes());
message = barSubscription.nextMessage();
assertNotNull("No message!", message);
assertEquals("hello back", new String(message.getData()));
We start by subscribing to two topics with synchronous subscriptions since they work much better inside a JUnit test. Then we send a message to one of them, specifying the other as a replyTo address.
我们首先用同步订阅来订阅两个主题,因为它们在JUnit测试中的效果更好。然后我们向其中一个主题发送消息,指定另一个主题为replyTo地址。
After reading the message from the first destination we “flip” the topics to send a response.
在阅读了第一个目的地的信息后,我们 “翻转 “主题,以发送一个回应。
3.5. Wildcard Subscriptions
3.5.通配符订阅
NATS server supports topic wildcards.
NATS服务器支持主题通配符。
Wildcards operate on topic tokens that are separated with the ’.’ character. The asterisk character ‘*’ matches an individual token. The greater-than symbol ‘>’ is a wildcard match for the remainder of a topic, which may be more than one token.
通配符对以’.’字符分隔的主题标记进行操作。星号字符’*’匹配一个单独的标记。大于符号’>’是一个通配符,用于匹配一个主题的其余部分,这可能是一个以上的标记。
For example:
比如说。
- foo.* matches foo.bar, foo.requests, but not foo.bar.requests
- foo.> matches foo.bar, foo.requests, foo.bar.requests, foo.bar.baeldung, etc.
Let’s try a few tests:
让我们试一试几个测试。
SyncSubscription fooSubscription = client.subscribeSync("foo.*");
client.publishMessage("foo.bar", "bar.foo", "hello there");
Message message = fooSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));
client.publishMessage("foo.bar.plop", "bar.foo", "hello there");
message = fooSubscription.nextMessage(200);
assertNull("Got message!", message);
SyncSubscription barSubscription = client.subscribeSync("foo.>");
client.publishMessage("foo.bar.plop", "bar.foo", "hello there");
message = barSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello there", new String(message.getData()));
4. Request/Reply Messaging
4.请求/回复信息传递
Our message exchange test resembled a common idiom on pub/sub messaging systems; request/reply. NATS has explicit support for this request/reply messaging.
我们的消息交换测试类似于pub/sub消息系统上的一个常见习语:请求/回复。NATS对这种请求/回复的消息传递有明确的支持。
Publishers can install a handler for requests using the asynchronous subscription method we used above:
出版商可以使用我们上面使用的异步订阅方法为请求安装一个处理程序。
AsyncSubscription subscription = natsConnection
.subscribe("foo.bar.requests", new MessageHandler() {
@Override
public void onMessage(Message msg) {
natsConnection.publish(message.getReplyTo(), reply.getBytes());
}
});
Or they can respond to requests as they arrive.
或者他们可以在请求到来时作出回应。
The API provides a request() method:
该API提供了一个request()方法。
Message reply = natsConnection.request("foo.bar.requests", request.getBytes(), 100);
This method creates a temporary mailbox for the response, and write the reply-to address for us.
这个方法为响应创建一个临时邮箱,并为我们写下回复地址。
Request() returns the response, or null if the request times out. The last argument is the number of milliseconds to wait.
Request()返回响应,或者null如果请求超时。最后一个参数是要等待的毫秒数。
We can modify our test for request/reply:
我们可以修改我们对请求/回复的测试。
natsConnection.subscribe(salary.requests", message -> {
natsConnection.publish(message.getReplyTo(), "denied!".getBytes());
});
Message reply = natsConnection.request("salary.requests", "I need a raise.", 100);
assertNotNull("No message!", reply);
assertEquals("denied!", new String(reply.getData()));
5. Message Queues
5.消息队列
Subscribers may specify queue groups at subscription time. When a message is published to the group NATS will deliver it to a one-and-only-one subscriber.
订阅者可以在订阅时指定队列组。当一个消息被发布到组中时,NATS将把它传递给一个唯一的订阅者。
Queue groups do not persist messages. If no listeners are available, the message is discarded.
队列组不坚持消息。如果没有听众,消息会被丢弃。
5.1. Subscribing to Queues
5.1.订阅队列
Subscribers specify a queue group name as a String:
订阅者指定一个队列组名称为字符串:。
SyncSubscription subscription = natsConnection.subscribe("topic", "queue name");
There is also an asynchronous version, of course:
当然,也有一个异步版本。
SyncSubscription subscription = natsConnection
.subscribe("topic", "queue name", new MessageHandler() {
@Override
public void onMessage(Message msg) {
log.info("Received message on {}", msg.getSubject());
}
});
The subscription creates the queue on the NATS server.
订阅在NATS服务器上创建队列。
5.2. Publishing to Queues
5.2.发布到队列
Publishing message to queue groups simply requires publishing to the associated topic:
向队列组发布消息只需要向相关主题发布。
natsConnection.publish("foo", "queue message".getBytes());
The NATS server will route the message to the queue and select a message receiver.
NATS服务器将把消息路由到队列并选择一个消息接收器。
We can verify this with a test:
我们可以通过一个测试来验证这一点。
SyncSubscription queue1 = natsConnection.subscribe("foo", "queue name");
SyncSubscription queue2 = natsConnection.subscribe("foo", "queue name");
natsConnection.publish("foo", "foobar".getBytes());
List<Message> messages = new ArrayList<>();
Message message = queue1.nextMessage(200);
if (message != null) messages.add(message);
message = queue2.nextMessage(200);
if (message != null) messages.add(message);
assertEquals(1, messages.size());
We only receive one message.
我们只收到一个信息。
If we change the first two lines to a normal subscription:
如果我们把前两行改为正常的订阅。
SyncSubscription queue1 = natsConnection.subscribe("foo");
SyncSubscription queue2 = natsConnection.subscribe("foo");
The test fails because the message is delivered to both subscribers.
测试失败,因为消息被送到了两个订阅者手中。
6. Conclusion
6.结论
In this brief introduction, we connected to a NATS server and sent both pub/sub messages and load-balanced queue messages. We looked at NATS support for wildcard subscriptions. We also used request/reply messaging.
在这个简短的介绍中,我们连接到一个NATS服务器,同时发送pub/sub消息和负载平衡队列消息。我们看了NATS对通配符订阅的支持。我们还使用了请求/回复消息传递。
Code samples, as always, can be found over on GitHub.
像往常一样,可以在GitHub上找到代码样本。