1. Overview
1.概述
In this tutorial, we’ll see how we can add MQTT messaging in a Java project using the libraries provided by the Eclipse Paho project.
在本教程中,我们将看到如何使用Eclipse Paho项目提供的库在一个Java项目中添加MQTT消息传递。
2. MQTT Primer
2.MQTT入门
MQTT (MQ Telemetry Transport) is a messaging protocol that was created to address the need for a simple and lightweight method to transfer data to/from low-powered devices, such as those used in industrial applications.
MQTT(MQ遥测传输)是一个消息传递协议,它的创建是为了满足对一种简单和轻量级的方法的需求,以便在低功率设备(如工业应用中使用的设备)之间传输数据。
With the increased popularity of IoT (Internet of Things) devices, MQTT has seen an increased use, leading to its standardization by OASIS and ISO.
随着IoT(物联网)设备的日益普及,MQTT的使用也越来越多,导致其被OASIS和ISO标准化。
The protocol supports a single messaging pattern, namely the Publish-Subscribe pattern: each message sent by a client contains an associated “topic” which is used by the broker to route it to subscribed clients. Topics names can be simple strings like “oiltemp” or a path-like string “motor/1/rpm“.
该协议支持单一的消息传递模式,即 “发布-订阅 “模式:客户端发送的每个消息都包含一个相关的 “主题”,该主题被经纪人用来将其发送到订阅的客户端。主题名称可以是简单的字符串,如”oiltemp“或类似路径的字符串”motor/1/rpm“。
In order to receive messages, a client subscribes to one or more topics using its exact name or a string containing one of the supported wildcards (“#” for multi-level topics and “+” for single-level”).
为了接收信息,客户端使用其确切的名称或包含支持的通配符之一的字符串(”#”用于多级主题,”+”用于单级”)订阅一个或多个主题。
3. Project Setup
3.项目设置
In order to include the Paho library in a Maven project, we have to add the following dependency:
为了在Maven项目中包含Paho库,我们必须添加以下依赖关系。
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
The latest version of the Eclipse Paho Java library module can be downloaded from Maven Central.
最新版本的Eclipse PahoJava库模块可以从Maven中心下载。
4. Client Setup
4.客户端设置
When using the Paho library, the first thing we need to do in order to send and/or receive messages from an MQTT broker is to obtain an implementation of the IMqttClient interface. This interface contains all methods required by an application in order to establish a connection to the server, send and receive messages.
当使用Paho库时,为了从MQTT代理处发送和/或接收消息,我们需要做的第一件事是获得一个IMqttClient接口的实现。该接口包含了应用程序所需的所有方法,以便与服务器建立连接,发送和接收消息。
Paho comes out of the box with two implementations of this interface, an asynchronous one (MqttAsyncClient) and a synchronous one (MqttClient). In our case, we’ll focus on the synchronous version, which has simpler semantics.
Paho开箱即有这个接口的两个实现,一个是异步的(MqttAsyncClient),一个是同步的(MqttClient)。在我们的案例中,我们将专注于同步版本,它的语义更简单。
The setup itself is a two-step process: we first create an instance of the MqttClient class and then we connect it to our server. The following subsection detail those steps.
设置本身是一个两步的过程:我们首先创建一个MqttClient类的实例,然后将其连接到我们的服务器。下面的小节将详细介绍这些步骤。
4.1. Creating a New IMqttClient Instance
4.1.创建一个新的IMqttClient实例
The following code snippet shows how to create a new IMqttClient synchronous instance:
下面的代码片段显示了如何创建一个新的IMqttClient同步实例。
String publisherId = UUID.randomUUID().toString();
IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);
In this case, we’re using the simplest constructor available, which takes the endpoint address of our MQTT broker and a client identifier, which uniquely identifies our client.
在这种情况下,我们使用最简单的构造函数,它接收我们的MQTT代理的端点地址和一个客户标识符,该标识符可以唯一地识别我们的客户。
In our case, we used a random UUID, so a new client identifier will be generated on every run.
在我们的案例中,我们使用了一个随机的UUID,所以每次运行都会产生一个新的客户标识符。
Paho also provides additional constructors that we can use in order to customize the persistence mechanism used to store unacknowledged messages and/or the ScheduledExecutorService used to run background tasks required by the protocol engine implementation.
Paho还提供了额外的构造函数,我们可以用它来定制用于存储未确认消息的持久化机制和/或用于运行协议引擎实现所需的后台任务的ScheduledExecutorService。
The server endpoint we’re using is a public MQTT broker hosted by the Paho project, which allows anyone with an internet connection to test clients without the need of any authentication.
我们使用的服务器端点是一个由Paho项目托管的公共MQTT代理,它允许任何有互联网连接的人测试客户端而不需要任何认证。
4.2. Connecting to the Server
4.2.连接到服务器
Our newly created MqttClient instance is not connected to the server. We do so by calling its connect() method, optionally passing a MqttConnectOptions instance that allows us to customize some aspects of the protocol.
我们新创建的MqttClient实例没有连接到服务器。我们通过调用它的connect()方法来做到这一点,可以选择传递一个MqttConnectOptions实例,允许我们自定义协议的某些方面。
In particular, we can use those options to pass additional information such as security credentials, session recovery mode, reconnection mode and so on.
特别是,我们可以使用这些选项来传递额外的信息,如安全凭证、会话恢复模式、重新连接模式等等。
The MqttConnectionOptions class expose those options as simple properties that we can set using normal setter methods. We only need to set the properties required for our scenario – the remaining ones will assume default values.
MqttConnectionOptions类将这些选项暴露为简单的属性,我们可以使用正常的setter方法来设置。我们只需要设置我们的方案所需的属性–其余的属性将假定为默认值。
The code used to establish a connection to the server typically looks like this:
用来建立与服务器连接的代码通常是这样的。
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
publisher.connect(options);
Here, we define our connection options so that:
在这里,我们定义我们的连接选项,以便。
- The library will automatically try to reconnect to the server in the event of a network failure
- It will discard unsent messages from a previous run
- Connection timeout is set to 10 seconds
5. Sending Messages
5.发送信息
Sending messages using an already connected MqttClient is very straightforward. We use one of the publish() method variants to send the payload, which is always a byte array, to a given topic, using one of the following quality-of-service options:
使用已经连接的MqttClient来发送消息是非常直接的。我们使用publish()方法的一个变体来发送有效载荷,它总是一个字节数组,到一个给定的主题,使用下列服务质量选项之一。
- 0 – “at most once” semantics, also known as “fire-and-forget”. Use this option when message loss is acceptable, as it does not require any kind of acknowledgment or persistence
- 1 – “at least once” semantics. Use this option when message loss is not acceptable and your subscribers can handle duplicates
- 2 – “exactly once” semantics. Use this option when message loss is not acceptable and your subscribers cannot handle duplicates
In our sample project, the EngineTemperatureSensor class plays the role of a mock sensor that produces a new temperature reading every time we invoke its call() method.
在我们的示例项目中,EngineTemperatureSensor类扮演了一个模拟传感器的角色,每次我们调用它的call()方法时都会产生一个新的温度读数。
This class implements the Callable interface so we can easily use it with one of the ExecutorService implementations available in the java.util.concurrent package:
该类实现了Callable接口,因此我们可以很容易地将其用于ExecutorService包中的一个ExecutorService实现。
public class EngineTemperatureSensor implements Callable<Void> {
// ... private members omitted
public EngineTemperatureSensor(IMqttClient client) {
this.client = client;
}
@Override
public Void call() throws Exception {
if ( !client.isConnected()) {
return null;
}
MqttMessage msg = readEngineTemp();
msg.setQos(0);
msg.setRetained(true);
client.publish(TOPIC,msg);
return null;
}
private MqttMessage readEngineTemp() {
double temp = 80 + rnd.nextDouble() * 20.0;
byte[] payload = String.format("T:%04.2f",temp)
.getBytes();
return new MqttMessage(payload);
}
}
The MqttMessage encapsulates the payload itself, the requested Quality-of-Service and also the retained flag for the message. This flag indicates to the broker that it should retain this message until consumed by a subscriber.
MqttMessage封装了有效载荷本身、请求的服务质量以及该消息的retained标志。该标志向代理表明它应该保留该消息,直到被用户消费。
We can use this feature to implement a “last known good” behavior, so when a new subscriber connects to the server, it will receive the retained message right away.
我们可以使用这个功能来实现 “最后已知良好 “的行为,所以当一个新的订阅者连接到服务器时,它将立即收到保留的信息。
6. Receiving Messages
6.接收信息
In order to receive messages from the MQTT broker, we need to use one of the subscribe() method variants, which allow us to specify:
为了从MQTT代理处接收消息,我们需要使用subscribe()方法变体之一,它允许我们指定。
- One or more topic filters for messages we want to receive
- The associated QoS
- The callback handler to process received messages
In the following example, we show how to add a message listener to an existing IMqttClient instance to receive messages from a given topic. We use a CountDownLatch as a synchronization mechanism between our callback and the main execution thread, decrementing it every time a new message arrives.
在下面的例子中,我们展示了如何在现有的IMqttClient实例中添加一个消息监听器,以接收来自指定主题的消息。我们使用一个CountDownLatch作为我们的回调和主执行线程之间的同步机制,在每次有新消息到达时递减它。
In the sample code, we’ve used a different IMqttClient instance to receive messages. We did it just to make more clear which client does what, but this is not a Paho limitation – if you want, you can use the same client for publishing and receiving messages:
在示例代码中,我们使用了一个不同的IMqttClient实例来接收消息。我们这样做只是为了更清楚哪个客户端做什么,但这并不是Paho的限制–如果你愿意,你可以使用同一个客户端来发布和接收消息。
CountDownLatch receivedSignal = new CountDownLatch(10);
subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {
byte[] payload = msg.getPayload();
// ... payload handling omitted
receivedSignal.countDown();
});
receivedSignal.await(1, TimeUnit.MINUTES);
The subscribe() variant used above takes an IMqttMessageListener instance as its second argument.
上面使用的subscribe()变体将一个IMqttMessageListener实例作为其第二个参数。
In our case, we use a simple lambda function that processes the payload and decrements a counter. If not enough messages arrive in the specified time window (1 minute), the await() method will throw an exception.
在我们的案例中,我们使用一个简单的lambda函数来处理有效载荷并递减一个计数器。如果在指定的时间窗口(1分钟)内没有足够的消息到达,await()方法将抛出一个异常。
When using Paho, we don’t need to explicitly acknowledge message receipt. If the callback returns normally, Paho assumes it a successful consumption and sends an acknowledgment to the server.
当使用Paho时,我们不需要明确地确认消息的接收。如果回调正常返回,Paho会认为这是一次成功的消费,并向服务器发送确认。
If the callback throws an Exception, the client will be shut down. Please note that this will result in loss of any messages sent with QoS level of 0.
如果回调抛出一个Exception,客户端将被关闭。请注意,这将导致发送的任何QoS级别为0的消息丢失。
Messages sent with QoS level 1 or 2 will be resent by the server once the client is reconnected and subscribes to the topic again.
一旦客户端重新连接并再次订阅该主题,以QoS等级1或2发送的信息将由服务器重新发送。
7. Conclusion
7.结语
In this article, we demonstrated how we can add support for the MQTT protocol in our Java applications using the library provided by the Eclipse Paho project.
在这篇文章中,我们演示了如何使用Eclipse Paho项目提供的库在我们的Java应用程序中添加对MQTT协议的支持。
This library handles all low-level protocol details, allowing us to focus on other aspects of our solution, while leaving good space to customize important aspects of its internal features, such as message persistence.
这个库处理所有低级别的协议细节,使我们能够专注于我们解决方案的其他方面,同时为定制其内部功能的重要方面留下良好的空间,如消息持久性。
The code shown in this article is available over on GitHub.
本文中显示的代码可在GitHub上获得。