Reliable Messaging with JGroups – 用JGroups进行可靠的信息传递

最后修改: 2018年 2月 13日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

JGroups is a Java API for reliable messages exchange. It features a simple interface that provides:

JGroups是一个用于可靠消息交换的Java API。它的特点是有一个简单的接口,提供。

  • a flexible protocol stack, including TCP and UDP
  • fragmentation and reassembly of large messages
  • reliable unicast and multicast
  • failure detection
  • flow control

As well as many other features.

以及许多其他功能。

In this tutorial, we’ll create a simple application for exchanging String messages between applications and supplying shared state to new applications as they join the network.

在本教程中,我们将创建一个简单的应用程序,用于在应用程序之间交换String消息,并在新应用程序加入网络时向其提供共享状态。

2. Setup

2.设置

2.1. Maven Dependency

2.1.Maven的依赖性

We need to add a single dependency to our pom.xml:

我们需要在我们的pom.xml中添加一个依赖关系。

<dependency>
    <groupId>org.jgroups</groupId>
    <artifactId>jgroups</artifactId>
    <version>4.0.10.Final</version>
</dependency>

The latest version of the library can be checked on Maven Central.

库的最新版本可以在Maven Central.上查看。

2.2. Networking

2.2.联网

JGroups will try to use IPV6 by default. Depending on our system configuration, this may result in applications not being able to communicate.

JGroups将尝试默认使用IPV6。根据我们的系统配置,这可能会导致应用程序无法通信。

To avoid this, we’ll set the java.net.preferIPv4Stack to true property when running our applications here:

为了避免这种情况,在这里运行我们的应用程序时,我们将把java.net.preferIPv4Stack设置为true属性。

java -Djava.net.preferIPv4Stack=true com.baeldung.jgroups.JGroupsMessenger

3. JChannels

3.J频道

Our connection to a JGroups network is a JChannel. The channel joins a cluster and sends and receives messages, as well as information about the state of the network.

我们与JGroups网络的连接是一个JChannel。通道加入一个集群,并发送和接收消息,以及关于网络状态的信息。

3.1. Creating a Channel

3.1.创建一个频道

We create a JChannel with a path to a configuration file. If we omit the file name, it will look for udp.xml in the current working directory.

我们用一个配置文件的路径创建一个JChannel。如果我们省略文件名,它将在当前工作目录中寻找udp.xml

We’ll create a channel with an explicitly named configuration file:

我们将用一个明确命名的配置文件创建一个通道。

JChannel channel = new JChannel("src/main/resources/udp.xml");

JGroups configuration can be very complicated, but the default UDP and TCP configurations are sufficient for most applications. We’ve included the file for UDP in our code and will use it for this tutorial.

JGroups的配置可能非常复杂,但默认的UDP和TCP配置对大多数应用来说是足够的。我们在代码中包含了UDP的文件,并将在本教程中使用它。

For more information on configuring the transport see the JGroups manual here.

有关配置传输的更多信息,请参阅JGroups手册这里

3.2. Connecting a Channel

3.2.连接一个通道

After we’ve created our channel, we need to join a cluster. A cluster is a group of nodes that exchange messages.

在我们创建了我们的频道之后,我们需要加入一个集群。集群是一组交换信息的节点。

Joining a cluster requires a cluster name:

加入一个集群需要一个集群名称。

channel.connect("Baeldung");

The first node that attempts to join a cluster will create it if it doesn’t exist. We’ll see this process in action below.

试图加入集群的第一个节点将创建它,如果它不存在。我们将在下面看到这个过程的运作。

3.3. Naming a Channel

3.3.命名一个通道

Nodes are identified by a name so that peers can send directed messages and receive notifications about who is entering and leaving the cluster. JGroups will assign a name automatically, or we can set our own:

节点由一个名字来识别,这样对等人就可以发送定向信息,并收到关于谁进入和离开集群的通知。JGroups会自动分配一个名字,或者我们可以设置自己的名字。

channel.name("user1");

We’ll use these names below, to track when nodes enter and leave the cluster.

我们将在下面使用这些名称,以跟踪节点何时进入和离开集群。

3.4. Closing a Channel

3.4.关闭一个通道

Channel cleanup is essential if we want peers to receive timely notification that we have exited.

如果我们希望对等人及时收到我们已经退出的通知,通道清理是必不可少的。

We close a JChannel with its close method:

我们用关闭方法关闭一个JChannel

channel.close()

4. Cluster View Changes

4.集群视图的变化

With a JChannel created we’re now ready to see the state of peers in the cluster and exchange messages with them.

创建了JChannel后,我们现在就可以看到集群中对等体的状态,并与它们交换信息。

JGroups maintains cluster state inside the View class. Each channel has a single View of the network. When the view changes, it’s delivered via the viewAccepted() callback.

JGroups在View类中维护集群状态。每个通道都有一个网络的View。当视图发生变化时,会通过viewAccepted()回调进行传递。

For this tutorial, we’ll extend the ReceiverAdaptor API class that implements all of the interface methods required for an application.

在本教程中,我们将扩展ReceiverAdaptor API类,它实现了应用程序所需的所有接口方法。

It’s the recommended way to implement callbacks.

这是实现回调的推荐方式。

Let’s add viewAccepted to our application:

让我们把viewAccepted添加到我们的应用程序。

public void viewAccepted(View newView) {

    private View lastView;

    if (lastView == null) {
        System.out.println("Received initial view:");
        newView.forEach(System.out::println);
    } else {
        System.out.println("Received new view.");

        List<Address> newMembers = View.newMembers(lastView, newView);
        System.out.println("New members: ");
        newMembers.forEach(System.out::println);

        List<Address> exMembers = View.leftMembers(lastView, newView);
        System.out.println("Exited members:");
        exMembers.forEach(System.out::println);
    }
    lastView = newView;
}

Each View contains a List of Address objects, representing each member of the cluster. JGroups offers convenience methods for comparing one view to another, which we use to detect new or exited members of the cluster.

每个View都包含一个ListAddress对象,代表集群的每个成员。JGroups提供了方便的方法来比较一个视图和另一个视图,我们用它来检测集群的新成员或退出的成员。

5. Sending Messages

5.发送信息

Message handling in JGroups is straightforward. A Message contains a byte array and Address objects corresponding to the sender and the receiver.

JGroups中的消息处理是直截了当的。一个Message包含一个byte数组和Address对象,对应于发送方和接收方。

For this tutorial we’re using Strings read from the command line, but it’s easy to see how an application could exchange other data types.

在本教程中,我们使用从命令行读取的字符串,但很容易看到应用程序如何交换其他数据类型。

5.1. Broadcast Messages

5.1.广播信息

A Message is created with a destination and a byte array; JChannel sets the sender for us. If the target is null, the entire cluster will receive the message.

一个Message被创建,带有一个目标和一个字节数组;JChannel为我们设置了发送者。如果目标是null整个集群将收到该消息。

We’ll accept text from the command line and send it to the cluster:

我们将接受来自命令行的文本并将其发送到集群。

System.out.print("Enter a message: ");
String line = in.readLine().toLowerCase();
Message message = new Message(null, line.getBytes());
channel.send(message);

If we run multiple instances of our program and send this message (after we implement the receive() method below), all of them would receive it, including the sender.

如果我们运行我们程序的多个实例并发送这个消息(在我们实现了下面的receive()方法之后),所有的实例都会收到它,包括发送者。

5.2. Blocking Our Messages

5.2.阻止我们的信息

If we don’t want to see our messages, we can set a property for that:

如果我们不希望看到我们的信息,我们可以为此设置一个属性。

channel.setDiscardOwnMessages(true);

When we run the previous test, the message sender does not receive its broadcast message.

当我们运行前面的测试时,信息发送者并没有收到它的广播信息。

5.3. Direct Messages

5.3.直接信息

Sending a direct message requires a valid Address. If we’re referring to nodes by name, we need a way to look up an Address. Fortunately, we have the View for that.

发送直接信息需要一个有效的Address。如果我们通过名字来引用节点,我们需要一种方法来查询Address。幸运的是,我们有View来做这个。

The current View is always available from the JChannel:

当前的View总是可以从JChannel获得。

private Optional<address> getAddress(String name) { 
    View view = channel.view(); 
    return view.getMembers().stream()
      .filter(address -> name.equals(address.toString()))
      .findAny(); 
}

Address names are available via the class toString() method, so we merely search the List of cluster members for the name we want.

地址名称可通过类toString()方法获得,所以我们只是在集群成员的List中搜索我们想要的名称。

So we can accept a name on from the console, find the associated destination, and send a direct message:

因此,我们可以从控制台接受一个名字,找到相关的目的地,并直接发送一个消息。

Address destination = null;
System.out.print("Enter a destination: ");
String destinationName = in.readLine().toLowerCase();
destination = getAddress(destinationName)
  .orElseThrow(() -> new Exception("Destination not found"); 
Message message = new Message(destination, "Hi there!"); 
channel.send(message);

6. Receiving Messages

6.接收信息

We can send messages, now let’s add try to receive them now.

我们可以发送信息,现在让我们添加尝试接收信息。

Let’s override ReceiverAdaptor’s empty receive method:

让我们重写ReceiverAdaptor的空接收方法。

public void receive(Message message) {
    String line = Message received from: " 
      + message.getSrc() 
      + " to: " + message.getDest() 
      + " -> " + message.getObject();
    System.out.println(line);
}

Since we know the message contains a String, we can safely pass getObject() to System.out.

由于我们知道消息包含一个String,我们可以安全地将getObject()传递给System.out

7. State Exchange

7.国家事务所

When a node enters the network, it may need to retrieve state information about the cluster. JGroups provides a state transfer mechanism for this.

当一个节点进入网络时,它可能需要检索集群的状态信息。JGroups为此提供了一个状态转移机制。

When a node joins the cluster, it simply calls getState(). The cluster usually retrieves the state from the oldest member in the group – the coordinator.

当一个节点加入集群时,它只需调用getState()。集群通常从群中最年长的成员–协调者那里获取状态。

Let’s add a broadcast message count to our application. We’ll add a new member variable and increment it inside receive():

让我们为我们的应用程序添加一个广播消息计数。我们将添加一个新的成员变量,并在receive()内将其递增。

private Integer messageCount = 0;

public void receive(Message message) {
    String line = "Message received from: " 
      + message.getSrc() 
      + " to: " + message.getDest() 
      + " -> " + message.getObject();
    System.out.println(line);

    if (message.getDest() == null) {
        messageCount++;
        System.out.println("Message count: " + messageCount);
    }
}

We check for a null destination because if we count direct messages, each node will have a different number.

我们检查null目的地,因为如果我们计算直接信息,每个节点将有不同的数字。

Next, we override two more methods in ReceiverAdaptor:

接下来,我们在ReceiverAdaptor中再覆盖两个方法。

public void setState(InputStream input) {
    try {
        messageCount = Util.objectFromStream(new DataInputStream(input));
    } catch (Exception e) {
        System.out.println("Error deserialing state!");
    }
    System.out.println(messageCount + " is the current messagecount.");
}

public void getState(OutputStream output) throws Exception {
    Util.objectToStream(messageCount, new DataOutputStream(output));
}

Similar to messages, JGroups transfers state as an array of bytes.

与消息类似,JGroups以一个bytes数组的形式传输状态。

JGroups supplies an InputStream to the coordinator to write the state to, and an OutputStream for the new node to read. The API provides convenience classes for serializing and deserializing the data.

JGroups为协调者提供了一个InputStream来写入状态,并为新节点提供了一个OutputStream来读取。该API为数据的序列化和反序列化提供了便利的类。

Note that in production code access to state information must be thread-safe.

注意,在生产代码中,对状态信息的访问必须是线程安全的。

Finally, we add the call to getState() to our startup, after we connect to the cluster:

最后,我们在连接到集群后,将对getState()的调用添加到我们的启动中。

channel.connect(clusterName);
channel.getState(null, 0);

getState() accepts a destination from which to request the state and a timeout in milliseconds. A null destination indicates the coordinator and 0 means do not timeout.

getState()接受一个请求状态的目的地和一个以毫秒为单位的超时。一个null目标表示协调者,0表示不超时。

When we run this app with a pair of nodes and exchange broadcast messages, we see the message count increment.

当我们用一对节点运行这个应用程序并交换广播消息时,我们看到消息计数在增加。

Then if we add a third client or stop and start one of them, we’ll see the newly connected node print the correct message count.

然后,如果我们添加第三个客户端或停止并启动其中一个,我们会看到新连接的节点打印出正确的消息数。

8. Conclusion

8.结论

In this tutorial, we used JGroups to create an application for exchanging messages. We used the API to monitor which nodes connected to and left the cluster and also to transfer cluster state to a new node when it joined.

在本教程中,我们使用JGroups创建了一个用于交换消息的应用程序。我们使用API来监控哪些节点连接到集群,哪些节点离开集群,并且在新节点加入时将集群状态传输给它。

Code samples, as always, can be found over on GitHub.

像往常一样,可以在GitHub上找到代码样本