Introduction to Netty – Netty简介

最后修改: 2017年 6月 8日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

In this article, we’re going to take a look at Netty — an asynchronous event-driven network application framework.

在这篇文章中,我们要看一下Netty–一个异步事件驱动的网络应用框架。

The main purpose of Netty is building high-performance protocol servers based on NIO (or possibly NIO.2) with separation and loose coupling of the network and business logic components. It might implement a widely known protocol, such as HTTP, or your own specific protocol.

Netty的主要目的是建立基于NIO(或可能是NIO.2)的高性能协议服务器,并将网络和业务逻辑组件分离和松散耦合。它可能实现一个广为人知的协议,如HTTP,或你自己的特定协议。

2. Core Concepts

2.核心概念

Netty is a non-blocking framework. This leads to high throughput compared to blocking IO. Understanding non-blocking IO is crucial to understanding Netty’s core components and their relationships.

Netty是一个非阻塞式框架。与阻塞式IO相比,这导致了高吞吐量。了解非阻塞式IO对于理解Netty的核心组件及其关系至关重要。

2.1. Channel

2.1.频道

Channel is the base of Java NIO. It represents an open connection which is capable of IO operations such as reading and writing.

Channel是Java NIO的基础。它代表了一个开放的连接,能够进行IO操作,如读和写。

2.2. Future

2.2.未来

Every IO operation on a Channel in Netty is non-blocking.

Netty中通道上的每个IO操作都是非阻塞的。

This means that every operation is returned immediately after the call. There is a Future interface in the standard Java library, but it’s not convenient for Netty purposes — we can only ask the Future about the completion of the operation or to block the current thread until the operation is done.

这意味着每个操作都会在调用后立即返回。在标准的Java库中有一个Future 接口,但是对于Netty来说并不方便–我们只能向Future 询问操作的完成情况,或者阻断当前线程直到操作完成。

That’s why Netty has its own ChannelFuture interface. We can pass a callback to ChannelFuture which will be called upon operation completion.

这就是为什么Netty有自己的ChannelFuture接口我们可以向ChannelFuture传递一个回调,它将在操作完成后被调用。

2.3. Events and Handlers

2.3.事件和处理程序

Netty uses an event-driven application paradigm, so the pipeline of the data processing is a chain of events going through handlers. Events and handlers can be related to the inbound and outbound data flow. Inbound events can be the following:

Netty使用事件驱动的应用范式,所以数据处理的管道是一个通过处理程序的事件链。事件和处理程序可以与入站和出站的数据流相关。入站事件可以是以下内容。

  • Channel activation and deactivation
  • Read operation events
  • Exception events
  • User events

Outbound events are simpler and, generally, are related to opening/closing a connection and writing/flushing data.

出站事件比较简单,一般来说,与打开/关闭连接和写入/刷新数据有关。

Netty applications consist of a couple of networking and application logic events and their handlers. The base interfaces for the channel event handlers are ChannelHandler and its successors ChannelOutboundHandler and ChannelInboundHandler.

Netty应用程序由一些网络和应用逻辑事件及其处理程序组成。通道事件处理程序的基础接口是ChannelHandler及其继承者ChannelOutboundHandlerChannelInboundHandler

Netty provides a huge hierarchy of implementations of ChannelHandler. It is worth noting the adapters which are just empty implementations, e.g. ChannelInboundHandlerAdapter and ChannelOutboundHandlerAdapter. We could extend these adapters when we need to process only a subset of all events.

Netty为ChannelHandler的实现提供了一个巨大的层次结构。值得注意的是那些只是空实现的适配器,例如ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter。当我们只需要处理所有事件的一个子集时,我们可以扩展这些适配器。

Also, there are many implementations of specific protocols such as HTTP, e.g. HttpRequestDecoder, HttpResponseEncoder, HttpObjectAggregator. It would be good to get acquainted with them in Netty’s Javadoc.

此外,还有许多特定协议的实现,例如HTTP,HttpRequestDecoder,HttpResponseEncoder,HttpObjectAggregator。在Netty的Javadoc中熟悉一下它们会很好。

2.4. Encoders and Decoders

2.4.编码器和解码器

As we work with the network protocol, we need to perform data serialization and deserialization. For this purpose, Netty introduces special extensions of the ChannelInboundHandler for decoders which are capable of decoding incoming data. The base class of most decoders is ByteToMessageDecoder.

当我们使用网络协议工作时,我们需要进行数据序列化和反序列化。为此,Netty为ChannelInboundHandler引入了特殊的扩展,用于解码器,能够对传入的数据进行解码。大多数解码器的基类是ByteToMessageDecoder。

For encoding outgoing data, Netty has extensions of the ChannelOutboundHandler called encoders. MessageToByteEncoder is the base for most encoder implementations. We can convert the message from byte sequence to Java object and vice versa with encoders and decoders.

对于外发数据的编码,Netty有ChannelOutboundHandler的扩展,称为encoders。MessageToByteEncoder是大多数编码器实现的基础我们可以通过编码器和解码器将消息从字节序列转换成Java对象,反之亦然。

3. Example Server Application

3.服务器应用实例

Let’s create a project representing a simple protocol server which receives a request, performs a calculation and sends a response.

让我们创建一个项目,代表一个简单的协议服务器,它接收一个请求,执行一个计算并发送一个响应。

3.1. Dependencies

3.1.依赖性

First of all, we need to provide the Netty dependency in our pom.xml:

首先,我们需要在pom.xml中提供Netty的依赖关系。

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.10.Final</version>
</dependency>

We can find the latest version over on Maven Central.

我们可以在Maven中心上找到最新版本。

3.2. Data Model

3.2.数据模型

The request data class would have the following structure:

请求数据类将有以下结构。

public class RequestData {
    private int intValue;
    private String stringValue;
    
    // standard getters and setters
}

Let’s assume that the server receives the request and returns the intValue multiplied by 2. The response would have the single int value:

让我们假设服务器收到请求并返回intValue乘以2的结果。

public class ResponseData {
    private int intValue;

    // standard getters and setters
}

3.3. Request Decoder

3.3.请求解码器

Now we need to create encoders and decoders for our protocol messages.

现在我们需要为我们的协议信息创建编码器和解码器。

It should be noted that Netty works with socket receive buffer, which is represented not as a queue but just as a bunch of bytes. This means that our inbound handler can be called when the full message is not received by a server.

应该注意的是,Netty使用套接字接收缓冲区工作,它不是表示为一个队列,而只是表示为一堆字节。这意味着当服务器没有收到完整的消息时,我们的入站处理程序可以被调用。

We must make sure that we have received the full message before processing and there are many ways to do that.

我们必须确保在处理之前已经收到了完整的信息,有许多方法可以做到这一点。

First of all, we can create a temporary ByteBuf and append to it all inbound bytes until we get the required amount of bytes:

首先,我们可以创建一个临时的ByteBuf,并将所有入站字节追加到它,直到我们得到所需的字节量。

public class SimpleProcessingHandler 
  extends ChannelInboundHandlerAdapter {
    private ByteBuf tmp;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        System.out.println("Handler added");
        tmp = ctx.alloc().buffer(4);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        System.out.println("Handler removed");
        tmp.release();
        tmp = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        tmp.writeBytes(m);
        m.release();
        if (tmp.readableBytes() >= 4) {
            // request processing
            RequestData requestData = new RequestData();
            requestData.setIntValue(tmp.readInt());
            ResponseData responseData = new ResponseData();
            responseData.setIntValue(requestData.getIntValue() * 2);
            ChannelFuture future = ctx.writeAndFlush(responseData);
            future.addListener(ChannelFutureListener.CLOSE);
        }
    }
}

The example shown above looks a bit weird but helps us to understand how Netty works. Every method of our handler is called when its corresponding event occurs. So we initialize the buffer when the handler is added, fill it with data on receiving new bytes and start processing it when we get enough data.

上面的例子看起来有点奇怪,但有助于我们理解Netty的工作原理。当相应的事件发生时,我们的处理程序的每个方法都被调用。所以我们在添加处理程序时初始化缓冲区,在接收到新的字节时用数据填充它,当我们得到足够的数据时开始处理它。

We deliberately did not use a stringValue — decoding in such a manner would be unnecessarily complex. That’s why Netty provides useful decoder classes which are implementations of ChannelInboundHandler: ByteToMessageDecoder and ReplayingDecoder.

我们特意没有使用stringValue–这样的解码方式将是不必要的复杂。这就是为什么Netty提供了有用的解码器类,它们是ChannelInboundHandler的实现。ByteToMessageDecoderReplayingDecoder。

As we noted above we can create a channel processing pipeline with Netty. So we can put our decoder as the first handler and the processing logic handler can come after it.

正如我们上面提到的,我们可以用Netty创建一个通道处理管道。因此,我们可以把我们的解码器作为第一个处理程序,处理逻辑程序可以在它之后。

The decoder for RequestData is shown next:

接下来显示的是RequestData的解码器。

public class RequestDecoder extends ReplayingDecoder<RequestData> {

    private final Charset charset = Charset.forName("UTF-8");

    @Override
    protected void decode(ChannelHandlerContext ctx, 
      ByteBuf in, List<Object> out) throws Exception {
 
        RequestData data = new RequestData();
        data.setIntValue(in.readInt());
        int strLen = in.readInt();
        data.setStringValue(
          in.readCharSequence(strLen, charset).toString());
        out.add(data);
    }
}

An idea of this decoder is pretty simple. It uses an implementation of ByteBuf which throws an exception when there is not enough data in the buffer for the reading operation.

这个解码器的想法非常简单。它使用了一个ByteBuf的实现,当缓冲区中没有足够的数据用于读取操作时,它会抛出一个异常。

When the exception is caught the buffer is rewound to the beginning and the decoder waits for a new portion of data. Decoding stops when the out list is not empty after decode execution.

当捕捉到异常时,缓冲区被重新卷到开头,解码器等待新的数据部分。当out 列表在decode 执行后不为空时,解码停止。

3.4. Response Encoder

3.4.响应编码器

Besides decoding the RequestData we need to encode the message. This operation is simpler because we have the full message data when the write operation occurs.

除了对RequestData进行解码,我们还需要对消息进行编码。这个操作比较简单,因为当写操作发生时我们有完整的消息数据。

We can write data to Channel in our main handler or we can separate the logic and create a handler extending MessageToByteEncoder which will catch the write ResponseData operation:

我们可以在我们的主处理程序中向Channel写数据,或者我们可以分离逻辑,创建一个扩展MessageToByteEncoder的处理程序,它将捕获写ResponseData操作。

public class ResponseDataEncoder 
  extends MessageToByteEncoder<ResponseData> {

    @Override
    protected void encode(ChannelHandlerContext ctx, 
      ResponseData msg, ByteBuf out) throws Exception {
        out.writeInt(msg.getIntValue());
    }
}

3.5. Request Processing

3.5.请求处理

Since we carried out the decoding and encoding in separate handlers we need to change our ProcessingHandler:

由于我们在不同的处理程序中进行了解码和编码,我们需要改变我们的ProcessingHandler

public class ProcessingHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) 
      throws Exception {
 
        RequestData requestData = (RequestData) msg;
        ResponseData responseData = new ResponseData();
        responseData.setIntValue(requestData.getIntValue() * 2);
        ChannelFuture future = ctx.writeAndFlush(responseData);
        future.addListener(ChannelFutureListener.CLOSE);
        System.out.println(requestData);
    }
}

3.6. Server Bootstrap

3.6.服务器Bootstrap

Now let’s put it all together and run our server:

现在让我们把这一切放在一起,运行我们的服务器。

public class NettyServer {

    private int port;

    // constructor

    public static void main(String[] args) throws Exception {
 
        int port = args.length > 0
          ? Integer.parseInt(args[0]);
          : 8080;
 
        new NettyServer(port).run();
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
              .channel(NioServerSocketChannel.class)
              .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) 
                  throws Exception {
                    ch.pipeline().addLast(new RequestDecoder(), 
                      new ResponseDataEncoder(), 
                      new ProcessingHandler());
                }
            }).option(ChannelOption.SO_BACKLOG, 128)
              .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

The details of the classes used in the above server bootstrap example can be found in their Javadoc. The most interesting part is this line:

上述服务器引导实例中使用的类的细节可以在其Javadoc中找到。最有趣的部分是这一行。

ch.pipeline().addLast(
  new RequestDecoder(), 
  new ResponseDataEncoder(), 
  new ProcessingHandler());

Here we define inbound and outbound handlers that will process requests and output in the correct order.

在这里,我们定义入站和出站处理程序,它们将按照正确的顺序处理请求和输出。

4. Client Application

4.客户应用

The client should perform reverse encoding and decoding, so we need to have a RequestDataEncoder and ResponseDataDecoder:

客户端应该进行反向编码和解码,所以我们需要有一个RequestDataEncoderResponseDataDecoder

public class RequestDataEncoder 
  extends MessageToByteEncoder<RequestData> {

    private final Charset charset = Charset.forName("UTF-8");

    @Override
    protected void encode(ChannelHandlerContext ctx, 
      RequestData msg, ByteBuf out) throws Exception {
 
        out.writeInt(msg.getIntValue());
        out.writeInt(msg.getStringValue().length());
        out.writeCharSequence(msg.getStringValue(), charset);
    }
}
public class ResponseDataDecoder 
  extends ReplayingDecoder<ResponseData> {

    @Override
    protected void decode(ChannelHandlerContext ctx, 
      ByteBuf in, List<Object> out) throws Exception {
 
        ResponseData data = new ResponseData();
        data.setIntValue(in.readInt());
        out.add(data);
    }
}

Also, we need to define a ClientHandler which will send the request and receive the response from server:

此外,我们需要定义一个ClientHandler,它将发送请求并从服务器接收响应。

public class ClientHandler extends ChannelInboundHandlerAdapter {
 
    @Override
    public void channelActive(ChannelHandlerContext ctx) 
      throws Exception {
 
        RequestData msg = new RequestData();
        msg.setIntValue(123);
        msg.setStringValue(
          "all work and no play makes jack a dull boy");
        ChannelFuture future = ctx.writeAndFlush(msg);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) 
      throws Exception {
        System.out.println((ResponseData)msg);
        ctx.close();
    }
}

Now let’s bootstrap the client:

现在我们来启动客户端。

public class NettyClient {
    public static void main(String[] args) throws Exception {
 
        String host = "localhost";
        int port = 8080;
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
 
                @Override
                public void initChannel(SocketChannel ch) 
                  throws Exception {
                    ch.pipeline().addLast(new RequestDataEncoder(), 
                      new ResponseDataDecoder(), new ClientHandler());
                }
            });

            ChannelFuture f = b.connect(host, port).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

As we can see, there are many details in common with the server bootstrapping.

正如我们所看到的,有许多细节与服务器的引导有共同之处。

Now we can run the client’s main method and take a look at the console output. As expected, we got ResponseData with intValue equal to 246.

现在我们可以运行客户端的主方法,看看控制台的输出。正如预期的那样,我们得到了ResponseDataintValue等于246。

5. Conclusion

5.结论

In this article, we had a quick introduction to Netty. We showed its core components such as Channel and ChannelHandler. Also, we’ve made a simple non-blocking protocol server and a client for it.

在这篇文章中,我们对Netty进行了快速介绍。我们展示了它的核心组件,如ChannelChannelHandler。此外,我们还做了一个简单的非阻塞协议服务器和一个客户端。

As always, all code samples are available over on GitHub.

一如既往,所有的代码样本都可以在GitHub上获得