Introduction to RSocket – RSocket简介

最后修改: 2018年 12月 16日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.绪论

In this tutorial, we’ll take a first look at RSocket and how it enables client-server communication.

在本教程中,我们将初步了解RSocket以及它如何实现客户机-服务器通信。

2. What Is RSocket?

2.什么是RSocket

RSocket is a binary, point-to-point communication protocol intended for use in distributed applications. In that sense, it provides an alternative to other protocols like HTTP.

RSocket是一个二进制、点对点的通信协议,旨在用于分布式应用中。从这个意义上说,它为HTTP等其他协议提供了一种选择。

A full comparison between RSocket and other protocols is beyond the scope of this article. Instead, we’ll focus on a key feature of RSocket: its interaction models.

对RSocket和其他协议的全面比较超出了本文的范围。相反,我们将专注于RSocket的一个关键特征:其交互模型。

RSocket provides four interaction models. With that in mind, we’ll explore each one with an example.

RSocket 提供了四种交互模型。考虑到这一点,我们将通过一个示例来探讨每一种模型。

3. Maven Dependencies

3.Maven的依赖性

RSocket needs only two direct dependencies for our examples:

在我们的例子中,RSocket只需要两个直接依赖关系。

<dependency>
    <groupId>io.rsocket</groupId>
    <artifactId>rsocket-core</artifactId>
    <version>0.11.13</version>
</dependency>
<dependency>
    <groupId>io.rsocket</groupId>
    <artifactId>rsocket-transport-netty</artifactId>
    <version>0.11.13</version>
</dependency>

The rsocket-core and rsocket-transport-netty dependencies are available on Maven Central.

rsocket-corersocket-transport-netty依赖项可在Maven中心获得。

An important note is that the RSocket library makes frequent use of reactive streams. The Flux and Mono classes are used throughout this article so a basic understanding of them will be helpful.

一个重要的注意点是,RSocket库经常使用反应式流FluxMono类在本文中被使用,所以对它们的基本了解将是有帮助的。

4. Server Setup

4.服务器设置

First, let’s create the Server class:

首先,让我们创建Server类。

public class Server {
    private final Disposable server;

    public Server() {
        this.server = RSocketFactory.receive()
          .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
          .transport(TcpServerTransport.create("localhost", TCP_PORT))
          .start()
          .subscribe();
    }

    public void dispose() {
        this.server.dispose();
    }

    private class RSocketImpl extends AbstractRSocket {}
}

Here we use the RSocketFactory to set up and listen to a TCP socket. We pass in our custom RSocketImpl to handle requests from clients. We’ll add methods to the RSocketImpl as we go.

这里我们使用RSocketFactory来设置和监听一个TCP套接字。我们传入我们的自定义RSocketImpl来处理来自客户端的请求。我们将在进行过程中向RSocketImpl添加方法。

Next, to start the server we just need to instantiate it:

接下来,为了启动服务器,我们只需要把它实例化。

Server server = new Server();

A single server instance can handle multiple connections. As a result, just one server instance will support all of our examples.

一个服务器实例可以处理多个连接。因此,只要一个服务器实例就能支持我们所有的例子。

When we’re finished, the dispose method will stop the server and release the TCP port.

当我们完成后,dispose方法将停止服务器并释放TCP端口。

4. Interaction Models

4.交互模型

4.1. Request/Response

4.1 请求/回应

RSocket provides a request/response model – each request receives a single response.

RSocket提供了一个请求/响应模型–每个请求都会收到一个响应。

For this model, we’ll create a simple service that returns a message back to the client.

对于这个模型,我们将创建一个简单的服务,返回一个消息给客户端。

Let’s start by adding a method to our extension of AbstractRSocket, RSocketImpl:

让我们先在AbstractRSocket的扩展中添加一个方法,RSocketImpl

@Override
public Mono<Payload> requestResponse(Payload payload) {
    try {
        return Mono.just(payload); // reflect the payload back to the sender
    } catch (Exception x) {
        return Mono.error(x);
    }
}

The requestResponse method returns a single result for each request, as we can see by the Mono<Payload> response type.

requestResponse方法为每个请求返回一个结果,我们可以从Mono<Payload>响应类型看出。

Payload is the class that contains message content and metadata. It’s used by all of the interaction models. The content of the payload is binary, but there are convenience methods that support String-based content.

Payload是包含消息内容和元数据的类。它被所有的交互模型所使用。有效载荷的内容是二进制的,但是有一些方便的方法支持基于String的内容。

Next, we can create our client class:

接下来,我们可以创建我们的客户端类。

public class ReqResClient {

    private final RSocket socket;

    public ReqResClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    public String callBlocking(String string) {
        return socket
          .requestResponse(DefaultPayload.create(string))
          .map(Payload::getDataUtf8)
          .block();
    }

    public void dispose() {
        this.socket.dispose();
    }
}

The client uses the RSocketFactory.connect() method to initiate a socket connection with the server. We use the requestResponse method on the socket to send a payload to the server.

客户端使用RSocketFactory.connect()方法来启动与服务器的套接字连接。我们使用套接字上的requestResponse方法向服务器发送一个有效载荷

Our payload contains the String passed into the client. When the Mono<Payload> response arrives we can use the getDataUtf8() method to access the String content of the response.

我们的有效载荷包含传入客户端的StringMono<Payload>响应到达时,我们可以使用getDataUtf8() 方法来访问响应中的String内容。

Finally, we can run the integration test to see request/response in action. We’ll send a String to the server and verify that the same String is returned:

最后,我们可以运行集成测试,看看request/response的运行情况。我们将向服务器发送一个String,并验证是否返回相同的String

@Test
public void whenSendingAString_thenRevceiveTheSameString() {
    ReqResClient client = new ReqResClient();
    String string = "Hello RSocket";

    assertEquals(string, client.callBlocking(string));

    client.dispose();
}

4.2. Fire-and-Forget

4.2 火灾和遗忘

With the fire-and-forget model, the client will receive no response from the server.

在fire-and-forget模式下,客户端将不会收到来自服务器的响应

In this example, the client will send simulated measurements to the server in 50ms intervals. The server will publish the measurements.

在这个例子中,客户端将以50ms的间隔向服务器发送模拟测量值。服务器将发布这些测量结果。

Let’s add a fire-and-forget handler to our server in the RSocketImpl class:

让我们在RSocketImpl类中为我们的服务器添加一个fire-and-forget处理器。

@Override
public Mono<Void> fireAndForget(Payload payload) {
    try {
        dataPublisher.publish(payload); // forward the payload
        return Mono.empty();
    } catch (Exception x) {
        return Mono.error(x);
    }
}

This handler looks very similar to the request/response handler. However, fireAndForget returns Mono<Void> instead of Mono<Payload>.

这个处理程序看起来与请求/响应处理程序非常相似。然而,fireAndForget返回Mono<Void>而不是Mono<Payload>

The dataPublisher is an instance of org.reactivestreams.Publisher. Thus, it makes the payload available to subscribers. We’ll make use of that in the request/stream example.

dataPublisherorg.reactivestreams.Publisher的一个实例。因此,它使有效载荷对订阅者可用。我们将在请求/流的例子中利用这一点。

Next, we’ll create the fire-and-forget client:

下一步,我们将创建 “火烧连营 “客户端。

public class FireNForgetClient {
    private final RSocket socket;
    private final List<Float> data;

    public FireNForgetClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    /** Send binary velocity (float) every 50ms */
    public void sendData() {
        data = Collections.unmodifiableList(generateData());
        Flux.interval(Duration.ofMillis(50))
          .take(data.size())
          .map(this::createFloatPayload)
          .flatMap(socket::fireAndForget)
          .blockLast();
    }

    // ... 
}

The socket setup is exactly the same as before.

插座的设置与以前完全一样。

The sendData() method uses a Flux stream to send multiple messages. For each message, we invoke socket::fireAndForget.

sendData()方法使用一个Flux流来发送多个消息。对于每个消息,我们调用socket::fireAndForget

We need to subscribe to the Mono<Void> response for each message. If we forget to subscribe then socket::fireAndForget will not execute.

我们需要订阅每个消息的Mono<Void>响应。如果我们忘记订阅,那么socket::fireAndForget将不会执行。

The flatMap operator makes sure the Void responses are passed to the subscriber, while the blockLast operator acts as the subscriber.

flatMap操作符确保Void响应被传递给订阅者,而blockLast操作符充当订阅者。

We’re going to wait until the next section to run the fire-and-forget test. At that point, we’ll create a request/stream client to receive the data that was pushed by the fire-and-forget client.

我们要等到下一节来运行fire-and-forget测试。到那时,我们将创建一个请求/流客户端来接收由fire-and-forget客户端推送的数据。

4.3. Request/Stream

4.3.请求/流

In the request/stream model, a single request may receive multiple responses. To see this in action we can build upon the fire-and-forget example. To do that, let’s request a stream to retrieve the measurements we sent in the previous section.

在请求/流模型中,一个请求可以收到多个响应。为了看到这一点,我们可以在fire-and-forget的例子的基础上进行操作。要做到这一点,让我们请求一个流来检索我们在上一节中发送的测量结果。

As before, let’s start by adding a new listener to the RSocketImpl on the server:

和以前一样,让我们先给服务器上的RSocketImpl添加一个新的监听器。

@Override
public Flux<Payload> requestStream(Payload payload) {
    return Flux.from(dataPublisher);
}

The requestStream handler returns a Flux<Payload> stream. As we recall from the previous section, the fireAndForget handler published incoming data to the dataPublisher. Now, we’ll create a Flux stream using that same dataPublisher as the event source. By doing this the measurement data will flow asynchronously from our fire-and-forget client to our request/stream client.

requestStream处理程序返回一个Flux<Payload>。正如我们在上一节所回顾的,fireAndForget处理程序将传入的数据发布到dataPublisher.现在,我们将使用同一个dataPublisher作为事件源创建一个Flux流。通过这样做,测量数据将以异步方式从我们的fire-and-forget客户端流向我们的request/stream客户端。

Let’s create the request/stream client next:

接下来我们来创建请求/流客户端。

public class ReqStreamClient {

    private final RSocket socket;

    public ReqStreamClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();
    }

    public Flux<Float> getDataStream() {
        return socket
          .requestStream(DefaultPayload.create(DATA_STREAM_NAME))
          .map(Payload::getData)
          .map(buf -> buf.getFloat())
          .onErrorReturn(null);
    }

    public void dispose() {
        this.socket.dispose();
    }
}

We connect to the server in the same way as our previous clients.

我们以与以前的客户相同的方式连接到服务器。

In getDataStream() we use socket.requestStream() to receive a Flux<Payload> stream from the server. From that stream, we extract the Float values from the binary data. Finally, the stream is returned to the caller, allowing the caller to subscribe to it and process the results.

getDataStream()中,我们使用socket.requestStream()来接收来自服务器的Flux<Payload> 流。从该流中,我们从二进制数据中提取Float值。最后,该流被返回给调用者,允许调用者订阅它并处理结果。

Now let’s test. We’ll verify the round trip from fire-and-forget to request/stream.

现在我们来测试一下。我们将验证从 fire-and-forget 到 request/stream 的往返过程。

We can assert that each value is received in the same order as it was sent. Then, we can assert that we receive the same number of values that were sent:

我们可以断言,每个值的接收顺序与发送顺序相同。然后,我们可以断言,我们收到的数值与发送的数值数量相同。

@Test
public void whenSendingStream_thenReceiveTheSameStream() {
    FireNForgetClient fnfClient = new FireNForgetClient(); 
    ReqStreamClient streamClient = new ReqStreamClient();

    List<Float> data = fnfClient.getData();
    List<Float> dataReceived = new ArrayList<>();

    Disposable subscription = streamClient.getDataStream()
      .index()
      .subscribe(
        tuple -> {
            assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2());
            dataReceived.add(tuple.getT2());
        },
        err -> LOG.error(err.getMessage())
      );

    fnfClient.sendData();

    // ... dispose client & subscription

    assertEquals("Wrong data count received", data.size(), dataReceived.size());
}

4.4. Channel

4.4.渠道

The channel model provides bidirectional communication. In this model, message streams flow asynchronously in both directions.

通道模型提供双向通信。在这个模型中,消息流在两个方向上异步流动。

Let’s create a simple game simulation to test this. In this game, each side of the channel will become a player.  As the game runs, these players will send messages to the other side at random time intervals. The opposite side will react to the messages.

让我们创建一个简单的游戏模拟来测试这个问题。在这个游戏中,通道的每一方将成为一个玩家。 随着游戏的运行,这些玩家将以随机的时间间隔向另一方发送消息。对方将对这些信息作出反应。

Firstly, we’ll create the handler on the server. Like before, we add to the RSocketImpl:

首先,我们要在服务器上创建处理程序。像以前一样,我们添加到RSocketImpl中。

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
    Flux.from(payloads)
      .subscribe(gameController::processPayload);
    return Flux.from(gameController);
}

The requestChannel handler has Payload streams for both input and output. The Publisher<Payload> input parameter is a stream of payloads received from the client. As they arrive, these payloads are passed to the gameController::processPayload function.

requestChannel处理器的输入和输出都有PayloadPublisher<Payload>输入参数是一个从客户端收到的有效载荷流。当它们到达时,这些有效载荷被传递给gameController::processPayload函数。

In response, we return a different Flux stream back to the client. This stream is created from our gameController, which is also a Publisher.

作为回应,我们将一个不同的Flux流返回给客户端。这个流是由我们的gameController创建的,它也是一个Publisher

Here is a summary of the GameController class:

这里是对GameController类的一个总结。

public class GameController implements Publisher<Payload> {
    
    @Override
    public void subscribe(Subscriber<? super Payload> subscriber) {
        // send Payload messages to the subscriber at random intervals
    }

    public void processPayload(Payload payload) {
        // react to messages from the other player
    }
}

When the GameController receives a subscriber it begins sending messages to that subscriber.

GameController收到一个用户时,它开始向该用户发送消息。

Next, let’s create the client:

接下来,让我们创建客户端。

public class ChannelClient {

    private final RSocket socket;
    private final GameController gameController;

    public ChannelClient() {
        this.socket = RSocketFactory.connect()
          .transport(TcpClientTransport.create("localhost", TCP_PORT))
          .start()
          .block();

        this.gameController = new GameController("Client Player");
    }

    public void playGame() {
        socket.requestChannel(Flux.from(gameController))
          .doOnNext(gameController::processPayload)
          .blockLast();
    }

    public void dispose() {
        this.socket.dispose();
    }
}

As we have seen in our previous examples, the client connects to the server in the same way as the other clients.

正如我们在以前的例子中所看到的,客户机以与其他客户机相同的方式连接到服务器。

The client creates its own instance of the GameController.

客户端创建自己的GameController实例。

We use socket.requestChannel() to send our Payload stream to the server.  The server responds with a Payload stream of its own.

我们使用socket.requestChannel()来发送我们的Payload流到服务器。 服务器用它自己的Payload流来回应。

As payloads received from the server we pass them to our gameController::processPayload handler.

作为从服务器收到的有效载荷,我们将它们传递给我们的gameController::processPayload处理器。

In our game simulation, the client and server are mirror images of each other. That is, each side is sending a stream of Payload and receiving a stream of Payload from the other end.

在我们的游戏模拟中,客户端和服务器是彼此的镜像。也就是说,每一方都在发送一个Payload流,并从另一端接收一个Payload

The streams run independently, without synchronization.

这些数据流独立运行,没有同步性。

Finally, let’s run the simulation in a test:

最后,让我们在一个测试中运行模拟。

@Test
public void whenRunningChannelGame_thenLogTheResults() {
    ChannelClient client = new ChannelClient();
    client.playGame();
    client.dispose();
}

5. Conclusion

5.总结

In this introductory article, we’ve explored the interaction models provided by RSocket. The full source code of the examples can be found in our Github repository.

在这篇介绍性文章中,我们已经探讨了RSocket所提供的交互模型。例子的完整源代码可以在我们的Github资源库中找到。

Be sure to check out the RSocket website for a deeper discussion. In particular, the FAQ and Motivations documents provide a good background.

请务必查看RSocket网站以了解更深入的讨论。特别是FAQMotivations文件提供了良好的背景。