A Guide to NIO2 Asynchronous Socket Channel – NIO2异步插座通道指南

最后修改: 2016年 12月 3日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this article, we will demonstrate how to build a simple server and its client using the Java 7 NIO.2 channel APIs.

在这篇文章中,我们将演示如何使用Java 7 NIO.2通道API构建一个简单的服务器及其客户端。

We’ll look at the AsynchronousServerSocketChannel and AsynchronousSocketChannel classes which are the key classes used in implementing the server and client respectively.

我们将看看AsynchronousServerSocketChannelAsynchronousSocketChannel类,它们是分别用于实现服务器和客户端的关键类。

If you are new to NIO.2 channel APIs, we have an introductory article on this site. You can read it by following this link.

如果你是NIO.2通道API的新手,我们在这个网站上有一篇介绍性的文章。你可以按照这个链接来阅读它。

All classes that are needed to use NIO.2 channel APIs are bundled up in java.nio.channels package:

所有需要使用NIO.2通道API的类都捆绑在java.nio.channels包中。

import java.nio.channels.*;

2. The Server With Future

2.具有未来的服务器

An instance of AsynchronousServerSocketChannel is created by calling the static open API on its class:

AsynchronousServerSocketChannel的一个实例是通过调用其类上的静态open API创建的。

AsynchronousServerSocketChannel server
  = AsynchronousServerSocketChannel.open();

A newly created asynchronous server socket channel is open but not yet bound, so we must bind it to a local address and optionally choose a port:

一个新创建的异步服务器套接字通道是开放的,但还没有被绑定,所以我们必须把它绑定到一个本地地址,并可选择一个端口。

server.bind(new InetSocketAddress("127.0.0.1", 4555));

We could just as well have passed in null so that it uses a local address and binds to an arbitrary port:

我们也可以传入null,这样它就会使用一个本地地址并绑定到一个任意的端口。

server.bind(null);

Once bound, the accept API is used to initiate the accepting of connections to the channel’s socket:

一旦绑定,accept API被用来启动对通道的套接字的连接的接受。

Future<AsynchronousSocketChannel> acceptFuture = server.accept();

As it is with asynchronous channel operations, the above call returns right away and execution continues.

与异步通道操作一样,上述调用会立即返回,继续执行。

Next, we can use the get API to query for a response from the Future object:

接下来,我们可以使用get API来查询Future对象的响应。

AsynchronousSocketChannel worker = future.get();

This call will block if necessary to wait for a connection request from a client. Optionally, we can specify a timeout if we don’t want to wait forever:

如果有必要,这个调用将阻塞,以等待来自客户端的连接请求。如果我们不想永远等待的话,可以指定一个超时时间。

AsynchronousSocketChannel worker = acceptFuture.get(10, TimeUnit.SECONDS);

After the above call returns and the operation were successful, we can create a loop within which we listen for incoming messages and echo them back to the client.

在上述调用返回且操作成功后,我们可以创建一个循环,在其中监听传入的消息并将其回传给客户端。

Let’s create a method called runServer within which we will do the waiting and process any incoming messages:

让我们创建一个名为runServer的方法,在这个方法中我们将进行等待并处理任何传入的消息。

public void runServer() {
    clientChannel = acceptResult.get();
    if ((clientChannel != null) && (clientChannel.isOpen())) {
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(32);
            Future<Integer> readResult  = clientChannel.read(buffer);
            
            // perform other computations
            
            readResult.get();
            
            buffer.flip();
            Future<Integer> writeResult = clientChannel.write(buffer);
 
            // perform other computations
 
            writeResult.get();
            buffer.clear();
        } 
        clientChannel.close();
        serverChannel.close();
    }
}

Inside the loop, all we do is create a buffer to read from and write to depending on the operation.

在循环中,我们所做的就是创建一个缓冲区,根据操作的需要进行读取和写入。

Then, every time we do a read or a write, we can continue executing any other code and when we are ready to process the result, we call the get() API on the Future object.

然后,每次我们进行读或写时,我们可以继续执行任何其他代码,当我们准备处理结果时,我们在Future对象上调用get() API。

To start the server, we call its constructor and then the runServer method inside main:

为了启动服务器,我们调用它的构造函数,然后调用runServer方法,在main里面。

public static void main(String[] args) {
    AsyncEchoServer server = new AsyncEchoServer();
    server.runServer();
}

3. The Server With CompletionHandler

3.带有CompletionHandler的服务器

In this section, we will see how to implement the same server using the CompletionHandler approach rather than a Future approach.

在本节中,我们将看到如何使用CompletionHandler方法而不是Future方法来实现相同的服务器。

Inside the constructor, we create an AsynchronousServerSocketChannel and bind it to a local address the same way we did before:

在构造函数中,我们创建一个AsynchronousServerSocketChannel,并按照之前的方式将其绑定到本地地址。

serverChannel = AsynchronousServerSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999);
serverChannel.bind(hostAddress);

Next, still inside the constructor, we create a while loop within which we accept any incoming connection from a client. This while loop is used strictly to prevent the server from exiting before establishing a connection with a client.

接下来,还是在构造函数中,我们创建了一个while循环,在这个循环中我们接受来自客户端的任何连接。这个while循环是严格用于防止服务器在与客户建立连接之前退出

To prevent the loop from running endlessly, we call System.in.read() at its end to block execution until an incoming connection is read from the standard input stream:

为了防止循环无休止地运行,我们在其末端调用System.in.read(),以阻止执行,直到从标准输入流中读取一个传入连接。

while (true) {
    serverChannel.accept(
      null, new CompletionHandler<AsynchronousSocketChannel,Object>() {

        @Override
        public void completed(
          AsynchronousSocketChannel result, Object attachment) {
            if (serverChannel.isOpen()){
                serverChannel.accept(null, this);
            }

            clientChannel = result;
            if ((clientChannel != null) && (clientChannel.isOpen())) {
                ReadWriteHandler handler = new ReadWriteHandler();
                ByteBuffer buffer = ByteBuffer.allocate(32);

                Map<String, Object> readInfo = new HashMap<>();
                readInfo.put("action", "read");
                readInfo.put("buffer", buffer);

                clientChannel.read(buffer, readInfo, handler);
             }
         }
         @Override
         public void failed(Throwable exc, Object attachment) {
             // process error
         }
    });
    System.in.read();
}

When a connection is established, the completed callback method in the CompletionHandler of the accept operation is called.

当连接建立后,接受操作的CompletionHandler中的completed回调方法被调用。

Its return type is an instance of AsynchronousSocketChannel. If the server socket channel is still open, we call the accept API again to get ready for another incoming connection while reusing the same handler.

它的返回类型是一个AsynchronousSocketChannel的实例。如果服务器套接字通道仍然开放,我们再次调用accept API,为另一个传入的连接做好准备,同时重复使用同一个处理程序。

Next, we assign the returned socket channel to a global instance. We then check that it is not null and that it is open before performing operations on it.

接下来,我们将返回的套接字通道分配给一个全局实例。然后,在对它进行操作之前,我们要检查它是否为空,以及它是否是开放的。

The point at which we can start read and write operations is inside the completed callback API of the accept operation’s handler. This step replaces the previous approach where we polled the channel with the get API.

我们可以开始读写操作的点是在accept操作的处理程序的completed回调API内。这一步取代了之前我们用get API轮询通道的方法。

Notice that the server will no longer exit after a connection has been established unless we explicitly close it.

请注意,服务器在建立连接后将不再退出,除非我们明确关闭它。

Notice also that we created a separate inner class for handling read and write operations; ReadWriteHandler. We will see how the attachment object comes in handy at this point.

还请注意,我们创建了一个单独的内部类来处理读写操作;ReadWriteHandler。我们将看到附件对象在这一点上如何派上用场。

First, let’s look at the ReadWriteHandler class:

首先,让我们看一下ReadWriteHandler类。

class ReadWriteHandler implements 
  CompletionHandler<Integer, Map<String, Object>> {
    
    @Override
    public void completed(
      Integer result, Map<String, Object> attachment) {
        Map<String, Object> actionInfo = attachment;
        String action = (String) actionInfo.get("action");

        if ("read".equals(action)) {
            ByteBuffer buffer = (ByteBuffer) actionInfo.get("buffer");
            buffer.flip();
            actionInfo.put("action", "write");

            clientChannel.write(buffer, actionInfo, this);
            buffer.clear();

        } else if ("write".equals(action)) {
            ByteBuffer buffer = ByteBuffer.allocate(32);

            actionInfo.put("action", "read");
            actionInfo.put("buffer", buffer);

            clientChannel.read(buffer, actionInfo, this);
        }
    }
    
    @Override
    public void failed(Throwable exc, Map<String, Object> attachment) {
        // 
    }
}

The generic type of our attachment in the ReadWriteHandler class is a map. We specifically need to pass two important parameters through it – the type of operation(action) and the buffer.

我们在ReadWriteHandler类中的附件的通用类型是一个map。我们特别需要通过它传递两个重要的参数–操作(action)的类型和缓冲区。

Next, we will see how these parameters are used.

接下来,我们将看到这些参数是如何被使用的。

The first operation we perform is a read since this is an echo server which only reacts to client messages. Inside the ReadWriteHandler‘s completed callback method, we retrieve the attached data and decide what to do accordingly.

我们执行的第一个操作是,因为这是一个只对客户端消息做出反应的回声服务器。在ReadWriteHandlercompleted回调方法中,我们检索所附的数据并决定相应的操作。

If it’s a read operation which has completed, we retrieve the buffer, change the action parameter of the attachment and perform a write operation right away to echo the message to the client.

如果是一个已经完成的操作,我们检索缓冲区,改变附件的动作参数,并立即执行操作,将消息回传给客户端。

If it’s a write operation which has just completed, we call the read API again to prepare the server to receive another incoming message.

如果是刚刚完成的操作,我们再次调用API,让服务器准备接收另一个传入的消息。

4. The Client

4.客户

After setting up the server, we can now set up the client by calling the open API on the AsyncronousSocketChannel class. This call creates a new instance of the client socket channel which we then use to make a connection to the server:

在设置了服务器之后,我们现在可以通过调用open类上的AsyncronousSocketChannel API来设置客户端。这个调用创建了一个新的客户端套接字通道的实例,然后我们用它来与服务器建立连接。

AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999)
Future<Void> future = client.connect(hostAddress);

The connect operation returns nothing on success. However, we can still use the Future object to monitor the state of the asynchronous operation.

connect操作成功后不返回任何东西。然而,我们仍然可以使用Future对象来监控异步操作的状态。

Let’s call the get API to await connection:

让我们调用get API来等待连接。

future.get()

After this step, we can start sending messages to the server and receiving echoes for the same. The sendMessage method looks like this:

在这一步之后,我们可以开始向服务器发送消息并接收相同的回音。sendMessage方法看起来像这样。

public String sendMessage(String message) {
    byte[] byteMsg = new String(message).getBytes();
    ByteBuffer buffer = ByteBuffer.wrap(byteMsg);
    Future<Integer> writeResult = client.write(buffer);

    // do some computation

    writeResult.get();
    buffer.flip();
    Future<Integer> readResult = client.read(buffer);
    
    // do some computation

    readResult.get();
    String echo = new String(buffer.array()).trim();
    buffer.clear();
    return echo;
}

5. The Test

5 测试

To confirm that our server and client applications are performing according to expectation, we can use a test:

为了确认我们的服务器和客户端应用程序是按照预期执行的,我们可以使用测试。

@Test
public void givenServerClient_whenServerEchosMessage_thenCorrect() {
    String resp1 = client.sendMessage("hello");
    String resp2 = client.sendMessage("world");

    assertEquals("hello", resp1);
    assertEquals("world", resp2);
}

6. Conclusion

6.结论

In this article, we have explored the Java NIO.2 asynchronous socket channel APIs. We have been able to step through the process of building a server and client with these new APIs.

在这篇文章中,我们已经探索了Java NIO.2异步套接字通道API。我们已经能够逐步完成用这些新的API构建服务器和客户端的过程。

You can access the full source code for this article by in the Github project.

你可以在Github项目中获取本文的完整源代码。