Introduction to the Java NIO Selector – Java NIO选择器的介绍

最后修改: 2016年 10月 14日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this article, we’ll explore the introductory parts of Java NIO’s Selector component.

在这篇文章中,我们将探讨Java NIO的Selector组件的入门部分。

A selector provides a mechanism for monitoring one or more NIO channels and recognizing when one or more become available for data transfer.

选择器提供了一种机制,用于监测一个或多个NIO通道,并识别一个或多个通道何时可用于数据传输。

This way, a single thread can be used for managing multiple channels, and thus multiple network connections.

这样一来,一个线程可以用来管理多个通道,从而管理多个网络连接。

2. Why Use a Selector?

2.为什么要使用选择器?

With a selector, we can use one thread instead of several to manage multiple channels. Context-switching between threads is expensive for the operating system, and additionally, each thread takes up memory.

通过选择器,我们可以使用一个线程而不是几个线程来管理多个通道。线程间的上下文切换对操作系统来说是很昂贵的,此外,每个线程都会占用内存。

Therefore, the fewer threads we use, the better. However, it’s important to remember that modern operating systems and CPU’s keep getting better at multitasking, so the overheads of multi-threading keep diminishing over time.

因此,我们使用的线程越少越好。然而,重要的是要记住,现代操作系统和CPU的多任务处理能力不断提高,所以多线程的开销随着时间的推移不断减少。

Here, we’ll be dealing with how we can handle multiple channels with a single thread using a selector.

在这里,我们要处理的是如何用一个选择器用一个线程处理多个通道。

Note also that selectors don’t just help you read data; they can also listen for incoming network connections and write data across slow channels.

还要注意的是,选择器不只是帮助你读取数据;它们还可以监听传入的网络连接,并在慢速通道上写入数据。

3. Setup

3.设置

To use the selector, we do not need any special set up. All the classes we need are in the core java.nio package and we just have to import what we need.

为了使用选择器,我们不需要任何特殊的设置。我们需要的所有类都在核心java.nio包中,我们只需要导入我们需要的东西。

After that, we can register multiple channels with a selector object. When I/O activity happens on any of the channels, the selector notifies us. This is how we can read from a large number of data sources on a single thread.

之后,我们可以用一个选择器对象注册多个通道。当任何通道发生I/O活动时,选择器会通知我们。这就是我们如何在一个单线程上从大量的数据源中读取数据。

Any channel we register with a selector must be a sub-class of SelectableChannel. These are a special type of channels that can be put in non-blocking mode.

我们用选择器注册的任何通道必须是SelectableChannel的子类。这些是一种特殊类型的通道,可以被置于非阻塞模式。

4. Creating a Selector

4.创建一个选择器

A selector may be created by invoking the static open method of the Selector class, which will use the system’s default selector provider to create a new selector:

一个选择器可以通过调用Selector类的静态open方法来创建,它将使用系统的默认选择器提供者来创建一个新的选择器。

Selector selector = Selector.open();

5. Registering Selectable Channels

5.注册可选择的频道

In order for a selector to monitor any channels, we must register these channels with the selector. We do this by invoking the register method of the selectable channel.

为了让一个选择器监视任何通道,我们必须在选择器上注册这些通道。我们通过调用可选择通道的register方法来做到这一点。

But before a channel is registered with a selector, it must be in non-blocking mode:

但是在一个通道被注册到选择器之前,它必须处于非阻塞模式。

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

This means that we cannot use FileChannels with a selector since they cannot be switched into non-blocking mode the way we do with socket channels.

这意味着我们不能用选择器来使用文件通道s,因为它们不能像我们用套接字通道那样被切换到非阻塞模式。

The first parameter is the Selector object we created earlier, the second parameter defines an interest set, meaning what events we are interested in listening for in the monitored channel, via the selector.

第一个参数是我们之前创建的Selector对象,第二个参数定义了一个兴趣集,,意思是我们有兴趣通过选择器监听被监控通道中的哪些事件。

There are four different events we can listen for, each is represented by a constant in the SelectionKey class:

我们可以监听四个不同的事件,每个事件都由SelectionKey类中的一个常数表示。

  • Connect when a client attempts to connect to the server. Represented by SelectionKey.OP_CONNECT
  • Accept when the server accepts a connection from a client. Represented by SelectionKey.OP_ACCEPT
  • Read when the server is ready to read from the channel. Represented by SelectionKey.OP_READ
  • Write when the server is ready to write to the channel. Represented by SelectionKey.OP_WRITE

The returned object SelectionKey represents the selectable channel’s registration with the selector. We’ll look at it further in the following section.

返回的对象SelectionKey代表可选择通道与选择器的注册。我们将在下面的章节中进一步研究它。

6. The SelectionKey Object

6、SelectionKey对象

As we saw in the previous section, when we register a channel with a selector, we get a SelectionKey object. This object holds data representing the registration of the channel.

正如我们在上一节看到的,当我们用选择器注册一个通道时,我们会得到一个SelectionKey对象。这个对象持有代表通道注册的数据。

It contains some important properties which we must understand well to be able to use the selector on the channel. We’ll look at these properties in the following subsections.

它包含一些重要的属性,我们必须很好地理解这些属性才能在通道上使用选择器。我们将在下面的小节中研究这些属性。

6.1. The Interest Set

6.1.利益集

An interest set defines the set of events that we want the selector to watch out for on this channel. It is an integer value; we can get this information in the following way.

一个兴趣集定义了我们希望选择器在这个频道上注意的事件集。它是一个整数值;我们可以通过以下方式获得这一信息。

First, we have the interest set returned by the SelectionKey‘s interestOps method. Then we have the event constant in SelectionKey we looked at earlier.

首先,我们有由SelectionKeyinterestOps方法返回的兴趣集。然后,我们有我们之前看的SelectionKey中的事件常量。

When we AND these two values, we get a boolean value that tells us whether the event is being watched for or not:

当我们与这两个值相加时,我们得到一个布尔值,告诉我们该事件是否被关注。

int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

6.2. The Ready Set

6.2.准备就绪

The ready set defines the set of events that the channel is ready for. It is an integer value as well; we can get this information in the following way.

准备集定义了通道准备好的事件集。它也是一个整数值;我们可以通过以下方式获得这一信息。

We’ve got the ready set returned by SelectionKey‘s readyOps method. When we AND this value with the events constants as we did in the case of interest set, we get a boolean representing whether the channel is ready for a particular value or not.

我们已经得到了由SelectionKeyreadyOps方法返回的就绪集。当我们和这个值与事件常量相加时,就像我们在兴趣集的情况下所做的那样,我们得到一个布尔值,代表通道是否为一个特定的值做好准备。

Another alternative and shorter way to do this is to use SelectionKey’s convenience methods for this same purpose:

另一个替代的、更短的方法是使用SelectionKey的方便方法来达到同样的目的。

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWriteable();

6.3. The Channel

6.3.频道

Accessing the channel being watched from the SelectionKey object is very simple. We just call the channel method:

SelectionKey对象访问正在观看的频道是非常简单的。我们只是调用channel方法。

Channel channel = key.channel();

6.4. The Selector

6.4.选择器

Just like getting a channel, it’s very easy to obtain the Selector object from the SelectionKey object:

就像获得一个通道一样,从SelectionKey对象中获得Selector对象非常容易。

Selector selector = key.selector();

6.5. Attaching Objects

6.5.连接对象

We can attach an object to a SelectionKey. Sometimes we may want to give a channel a custom ID or attach any kind of Java object we may want to keep track of.

我们可以将一个对象附加到一个SelectionKey。有时我们可能想给一个通道一个自定义的ID,或者附加我们可能想跟踪的任何种类的Java对象。

Attaching objects is a handy way of doing it. Here is how you attach and get objects from a SelectionKey:

附加对象是一种方便的方法。下面是你如何从SelectionKey附加和获取对象。

key.attach(Object);

Object object = key.attachment();

Alternatively, we can choose to attach an object during channel registration. We add it as a third parameter to channel’s register method, like so:

另外,我们可以选择在通道注册时附加一个对象。我们把它作为第三个参数添加到通道的register方法中,像这样。

SelectionKey key = channel.register(
  selector, SelectionKey.OP_ACCEPT, object);

7. Channel Key Selection

7.频道键选择

So far, we have looked at how to create a selector, register channels to it and inspect the properties of the SelectionKey object which represents a channel’s registration to a selector.

到目前为止,我们已经研究了如何创建一个选择器,向它注册通道,以及检查SelectionKey对象的属性,该对象代表一个通道向选择器的注册。

This is only half of the process, now we have to perform a continuous process of selecting the ready set which we looked at earlier. We do selection using selector’s select method, like so:

这只是过程的一半,现在我们必须执行一个连续的过程,即选择我们前面所看的准备好的集合。我们使用选择器的select方法进行选择,像这样。

int channels = selector.select();

This method blocks until at least one channel is ready for an operation. The integer returned represents the number of keys whose channels are ready for an operation.

这个方法会阻塞,直到至少有一个通道准备好进行操作。返回的整数代表通道已准备好进行操作的键的数量。

Next, we usually retrieve the set of selected keys for processing:

接下来,我们通常会检索所选键的集合进行处理。

Set<SelectionKey> selectedKeys = selector.selectedKeys();

The set we have obtained is of SelectionKey objects, each key represents a registered channel which is ready for an operation.

我们得到的集合是SelectionKey对象,每个键代表一个已注册的通道,准备进行操作。

After this, we usually iterate over this set and for each key, we obtain the channel and perform any of the operations that appear in our interest set on it.

在这之后,我们通常会在这个集合上进行迭代,对于每一个键,我们都会获得通道,并对其执行我们兴趣集合中出现的任何操作。

During the lifetime of a channel, it may be selected several times as its key appears in the ready set for different events. This is why we must have a continuous loop to capture and process channel events as and when they occur.

在一个通道的生命周期中,它可能会被多次选择,因为它的键出现在不同事件的准备集中。这就是为什么我们必须有一个连续的循环来捕捉和处理通道事件,当它们发生时。

8. Complete Example

8.完整的例子

To cement the knowledge we have gained in the previous sections, we’re going to build a complete client-server example.

为了巩固我们在前面几节中获得的知识,我们将建立一个完整的客户-服务器实例。

For ease of testing out our code, we’ll build an echo server and an echo client. In this kind of setup, the client connects to the server and starts sending messages to it. The server echoes back messages sent by each client.

为了便于测试我们的代码,我们将建立一个回声服务器和一个回声客户端。在这种设置中,客户端连接到服务器并开始向它发送消息。服务器会回传每个客户端发送的消息。

When the server encounters a specific message, such as end, it interprets it as the end of the communication and closes the connection with the client.

当服务器遇到一个特定的消息,如end,它将其解释为通信的结束,并关闭与客户端的连接。

8.1. The Server

8.1.服务器

Here is our code for EchoServer.java:

这里是我们的EchoServer.java的代码。

public class EchoServer {

    private static final String POISON_PILL = "POISON_PILL";

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost", 5454));
        serverSocket.configureBlocking(false);
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        ByteBuffer buffer = ByteBuffer.allocate(256);

        while (true) {
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {

                SelectionKey key = iter.next();

                if (key.isAcceptable()) {
                    register(selector, serverSocket);
                }

                if (key.isReadable()) {
                    answerWithEcho(buffer, key);
                }
                iter.remove();
            }
        }
    }

    private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
      throws IOException {
 
        SocketChannel client = (SocketChannel) key.channel();
        client.read(buffer);
        if (new String(buffer.array()).trim().equals(POISON_PILL)) {
            client.close();
            System.out.println("Not accepting client messages anymore");
        }
        else {
            buffer.flip();
            client.write(buffer);
            buffer.clear();
        }
    }

    private static void register(Selector selector, ServerSocketChannel serverSocket)
      throws IOException {
 
        SocketChannel client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
    }

    public static Process start() throws IOException, InterruptedException {
        String javaHome = System.getProperty("java.home");
        String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
        String classpath = System.getProperty("java.class.path");
        String className = EchoServer.class.getCanonicalName();

        ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className);

        return builder.start();
    }
}

This is what is happening; we create a Selector object by calling the static open method. We then create a channel also by calling its static open method, specifically a ServerSocketChannel instance.

这就是正在发生的事情;我们通过调用静态open方法创建一个Selector对象。然后我们通过调用其静态open方法创建一个通道,特别是一个ServerSocketChannel实例。

This is because ServerSocketChannel is selectable and good for a stream-oriented listening socket.

这是因为ServerSocketChannel是可选择的,对面向流的监听套接字很好

We then bind it to a port of our choice. Remember we said earlier that before registering a selectable channel to a selector, we must first set it to non-blocking mode. So next we do this and then register the channel to the selector.

然后我们把它绑定到一个我们选择的端口。记得我们之前说过,在将一个可选择的通道注册到选择器之前,我们必须先将其设置为非阻塞模式。所以接下来我们要做这个,然后把这个通道注册到选择器上。

We don’t need the SelectionKey instance of this channel at this stage, so we will not remember it.

在这个阶段,我们不需要这个通道的SelectionKey实例,所以我们将不记住它。

Java NIO uses a buffer-oriented model other than a stream-oriented model. So socket communication usually takes place by writing to and reading from a buffer.

Java NIO使用了一个面向缓冲区的模型,而不是面向流的模型。因此,套接字通信通常是通过向缓冲区写和从缓冲区读来进行的。

We, therefore, create a new ByteBuffer which the server will be writing to and reading from. We initialize it to 256 bytes, it’s just an arbitrary value, depending on how much data we plan to transfer to and fro.

因此,我们创建一个新的ByteBuffer,服务器将从它那里写入和读出。我们将其初始化为256字节,这只是一个任意的值,取决于我们计划来回传输多少数据。

Finally, we perform the selection process. We select the ready channels, retrieve their selection keys, iterate over the keys, and perform the operations for which each channel is ready.

最后,我们执行选择过程。我们选择准备好的通道,检索它们的选择键,遍历这些键,并执行每个通道准备好的操作。

We do this in an infinite loop since servers usually need to keep running whether there is an activity or not.

我们以无限循环的方式进行,因为无论是否有活动,服务器通常都需要持续运行。

The only operation a ServerSocketChannel can handle is an ACCEPT operation. When we accept the connection from a client, we obtain a SocketChannel object on which we can do read and writes. We set it to non-blocking mode and register it for a READ operation to the selector.

ServerSocketChannel可以处理的唯一操作是ACCEPT操作。当我们接受来自客户端的连接时,我们获得了一个SocketChannel对象,我们可以对其进行读写操作。我们将其设置为非阻塞模式,并将其注册为选择器的读操作。

During one of the subsequent selections, this new channel will become read-ready. We retrieve it and read it contents into the buffer. True to it’s as an echo server, we must write this content back to the client.

在随后的一次选择中,这个新通道将成为可读的。我们检索它并将其内容读入缓冲区。由于它是一个回声服务器,我们必须将这些内容写回给客户端。

When we desire to write to a buffer from which we have been reading, we must call the flip() method.

当我们希望向一个我们已经读过的缓冲区写入时,我们必须调用flip()方法

We finally set the buffer to write mode by calling the flip method and simply write to it.

我们最后通过调用flip方法将缓冲区设置为写模式,并简单地写到它。

The start() method is defined so that the echo server can be started as a separate process during unit testing.

定义了 start()方法,以便在单元测试时可以将回声服务器作为一个单独的进程来启动。

8.2. The Client

8.2.客户

Here is our code for EchoClient.java:

下面是我们的EchoClient.java的代码。

public class EchoClient {
    private static SocketChannel client;
    private static ByteBuffer buffer;
    private static EchoClient instance;

    public static EchoClient start() {
        if (instance == null)
            instance = new EchoClient();

        return instance;
    }

    public static void stop() throws IOException {
        client.close();
        buffer = null;
    }

    private EchoClient() {
        try {
            client = SocketChannel.open(new InetSocketAddress("localhost", 5454));
            buffer = ByteBuffer.allocate(256);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public String sendMessage(String msg) {
        buffer = ByteBuffer.wrap(msg.getBytes());
        String response = null;
        try {
            client.write(buffer);
            buffer.clear();
            client.read(buffer);
            response = new String(buffer.array()).trim();
            System.out.println("response=" + response);
            buffer.clear();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;

    }
}

The client is simpler than the server.

客户端比服务器更简单。

We use a singleton pattern to instantiate it inside the start static method. We call the private constructor from this method.

我们使用单子模式在start静态方法内将其实例化。我们从这个方法中调用私有构造函数。

In the private constructor, we open a connection on the same port on which the server channel was bound and still on the same host.

在私有构造函数中,我们在服务器通道被绑定的同一端口上打开一个连接,并且仍然在同一主机上。

We then create a buffer to which we can write and from which we can read.

然后,我们创建一个缓冲区,我们可以向其中写,也可以从其中读。

Finally, we have a sendMessage method which reads wraps any string we pass to it into a byte buffer which is transmitted over the channel to the server.

最后,我们有一个sendMessage方法,该方法将我们传递给它的任何字符串读取包装成一个字节缓冲区,通过通道传输到服务器。

We then read from the client channel to get the message sent by the server. We return this as the echo of our message.

然后我们从客户通道中读取服务器发送的消息。我们将其作为我们的消息的回声返回。

8.3. Testing

8.3.测试

Inside a class called EchoTest.java, we are going to create a test case that starts the server, sends messages to the server, and only passes when the same messages are received back from the server. As a final step, the test case stops the server before completion.

在一个名为EchoTest.java的类中,我们将创建一个测试用例,启动服务器,向服务器发送消息,只有当从服务器收到相同的消息时才通过。作为最后一步,该测试用例在完成之前停止服务器。

We can now run the test:

我们现在可以运行测试了。

public class EchoTest {

    Process server;
    EchoClient client;

    @Before
    public void setup() throws IOException, InterruptedException {
        server = EchoServer.start();
        client = EchoClient.start();
    }

    @Test
    public void givenServerClient_whenServerEchosMessage_thenCorrect() {
        String resp1 = client.sendMessage("hello");
        String resp2 = client.sendMessage("world");
        assertEquals("hello", resp1);
        assertEquals("world", resp2);
    }

    @After
    public void teardown() throws IOException {
        server.destroy();
        EchoClient.stop();
    }
}

9. Selector.wakeup()

9.Selector.wakeup()

As we saw earlier, calling selector.select() blocks the current thread until one of the watched channels becomes operation-ready. We can override this by calling selector.wakeup() from another thread.

正如我们之前看到的,调用selector.select()会阻塞当前线程,直到被监视的通道之一成为操作就绪。我们可以通过从另一个线程调用selector.wakeup()来重写这一点。

The result is that the blocking thread returns immediately rather than continuing to wait, whether a channel has become ready or not.

其结果是,阻塞线程立即返回,而不是继续等待,无论一个通道是否已经准备好

We can demonstrate this using a CountDownLatch and tracking the code execution steps:

我们可以使用CountDownLatch来证明这一点,并跟踪代码执行步骤。

@Test
public void whenWakeUpCalledOnSelector_thenBlockedThreadReturns() {
    Pipe pipe = Pipe.open();
    Selector selector = Selector.open();
    SelectableChannel channel = pipe.source();
    channel.configureBlocking(false);
    channel.register(selector, OP_READ);

    List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());

    CountDownLatch latch = new CountDownLatch(1);

    new Thread(() -> {
        invocationStepsTracker.add(">> Count down");
        latch.countDown();
        try {
            invocationStepsTracker.add(">> Start select");
            selector.select();
            invocationStepsTracker.add(">> End select");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }).start();

    invocationStepsTracker.add(">> Start await");
    latch.await();
    invocationStepsTracker.add(">> End await");

    invocationStepsTracker.add(">> Wakeup thread");
    selector.wakeup();
    //clean up
    channel.close();

    assertThat(invocationStepsTracker)
      .containsExactly(
        ">> Start await",
        ">> Count down",
        ">> Start select",
        ">> End await",
        ">> Wakeup thread",
        ">> End select"
    );
}

In this example, we use Java NIO’s Pipe class to open a channel for testing purposes. We track code execution steps in a thread-safe list. By analyzing these steps, we can see how selector.wakeup() releases the thread blocked by selector.select().

在这个例子中,我们使用Java NIO的Pipe类来打开一个通道用于测试。我们在一个线程安全的列表中跟踪代码执行步骤。通过分析这些步骤,我们可以看到selector.wakeup()如何释放被selector.select()阻塞的线程。

10. Conclusion

10.结论

In this article, we have covered the basic usage of the Java NIO Selector component.

在这篇文章中,我们已经介绍了Java NIO Selector组件的基本用法。

The complete source code and all code snippets for this article are available in my GitHub project.

本文的完整源代码和所有代码片段可在我的GitHub项目中找到。