1. Introduction
1.导言
In this tutorial, we’ll use Netty to create a chat room app. In network programming, Netty stands out as a robust framework that simplifies the complexities of asynchronous I/O operations. We’ll explore how to build a basic chat server where multiple clients can connect and engage in real-time conversations.
在本教程中,我们将使用 Netty 创建一个聊天室应用程序。在网络编程中,Netty 是一个强大的框架,它简化了异步 I/O 操作的复杂性。我们将探讨如何构建一个基本的聊天服务器,让多个客户端可以连接并进行实时对话。
2. Scenario
2.场景
Messages sent to the server will be relayed to all connected clients. It’ll also keep a list of the last few messages sent so new clients can have context from the current conversation when they connect. To do this, we’ll only need a couple of event handlers to maintain communication between channels:
发送到服务器的信息将转发给所有已连接的客户端。它还将保留最近发送的几条消息的列表,以便新客户在连接时可以了解当前对话的上下文。为此,我们只需要几个事件处理程序来维持通道之间的通信:
In Netty, communication is done through channels, which abstract asynchronous I/O operations over any protocol. That allows us to focus on application logic instead of networking code. Our application will work via the command line. We’ll write a server and a client app.
在 Netty 中,通信是通过通道完成的,通道抽象了任何协议上的异步 I/O 操作。这让我们可以专注于应用程序逻辑,而不是网络代码。我们的应用程序将通过命令行运行。我们将编写一个服务器和一个客户端应用程序。
3. Creating a Custom Event Handler
3.创建自定义事件处理程序
For communication between channels, we’ll implement a SimpleChannelInboundHandler<String>, a generic implementation of ChannelInboundHandlerAdapter. This adapter allows us to focus on implementing only the events we care about. In this case, it’s channelRead0(), which is called when a message is received from the server. We’ll use this to simplify our use case since we’ll only exchange String messages.
对于通道之间的通信,我们将实现 SimpleChannelInboundHandler<String>,这是 ChannelInboundHandlerAdapter 的 Generic 实现。通过该适配器,我们只需专注于实现我们关心的事件。在本例中,它就是 channelRead0() ,当从服务器接收到消息时会调用该事件。我们将使用它来简化我们的用例,因为我们将只交换 String 消息。
3.1. Client Event Handler
3.1.客户端事件处理程序
Let’s start with the handler for client messages, which will print anything received by the server to the console, without modifications:
让我们从客户端消息的处理程序开始,它将把服务器收到的任何内容不加修改地打印到控制台:
public class ClientEventHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
}
Later, we’ll handle message sending by writing directly to the channel.
稍后,我们将通过直接写入通道来处理信息发送。
3.2. Message Object
3.2 信息对象
Before we move on to server events, let’s write a POJO to represent every message sent to the server. We’ll register the date sent along with a user name and message:
在继续讨论服务器事件之前,我们先编写一个 POJO 来表示发送到服务器的每一条消息。我们将注册与用户名和消息一起发送的 日期:
public class Message {
private final Instant time;
private final String user;
private final String message;
public Message(String user, String message) {
this.time = Instant.now();
this.user = user;
this.message = message;
}
// standard getters...
}
Then, we’ll include a few helpers, starting with how the messages will appear on the console when sent by the server:
然后,我们将提供一些帮助,首先是服务器发送消息时,消息将如何显示在控制台上:
@Override
public String toString() {
return time + " - " + user + ": " + message;
}
Then, for parsing messages received by clients, we’ll use a CSV format. We’ll see how the client sends messages in this format when we create our client app:
然后,为了解析客户端收到的消息,我们将使用 CSV 格式。我们将在创建客户端程序时了解客户端如何以这种格式发送消息:
public static Message parse(String string) {
String[] arr = string.split(";", 2);
return new Message(arr[0], arr[1]);
}
Limiting the split to 2 is important because the message part might contain a semicolon.
由于信息部分可能包含分号,因此将分隔限制在 2 个分隔符非常重要。
3.3. Server Event Handler
3.3 服务器事件处理程序
In our server event handler, we’ll first create a helper method for the other events we’ll override. Also, we’ll need a map of clients connected and a Queue to keep at most MAX_HISTORY elements:
在我们的服务器事件处理程序中,我们将首先为我们要覆盖的其他事件创建一个辅助方法。此外,我们还需要一个已连接客户端的地图和一个 Queue 来保存最多 MAX_HISTORY 元素:
public class ServerEventHandler extends SimpleChannelInboundHandler<String> {
static final Map<String, Channel> clients = new HashMap<>();
static final Queue<String> history = new LinkedList<>();
static final int MAX_HISTORY = 5;
private void handleBroadcast(Message message, ChannelHandlerContext context) {
String channelId = context.channel()
.id()
.asShortText();
clients.forEach((id, channel) -> {
if (!id.equals(channelId))
channel.writeAndFlush(message.toString());
});
// history-control code...
}
// ...
}
First, we get the channel ID as a key for our map. Then, for the broadcast, for every client connected, excluding the sender, we relay their message.
首先,我们获取频道 ID 作为映射的密钥。然后,对于广播,对于每个连接的客户端(不包括发送者),我们都会转发他们的信息。
It’s important to note that writeAndFlush() receives an Object. And, since our handlers can only handle strings, it’s essential to call toString() so the client can correctly receive it.
需要注意的是,writeAndFlush() 接收的是一个 Object 对象。而且,由于我们的处理程序只能处理字符串,因此必须调用 toString() 以便客户端能够正确接收。
In the end, we do history control. Every time we add a new message, we remove the oldest one if our list exceeds MAX_HISTORY items:
最后,我们进行历史控制。每次添加新信息时,如果我们的列表超过 MAX_HISTORY 项,我们就会删除最旧的信息:
history.add(message.toString());
if (history.size() > MAX_HISTORY)
history.poll();
Now, we can override channelRead0() and parse messages received from clients:
现在,我们可以覆盖 channelRead0() 并解析从客户端收到的消息:
@Override
public void channelRead0(ChannelHandlerContext context, String msg) {
handleBroadcast(Message.parse(msg), context);
}
Then, for every client that comes online, we add it to our clients list, relay old messages for context, and send a system message announcing the new client:
然后,对于每一个上线的客户端,我们都会将其添加到客户端列表中,转发旧消息以了解上下文,并发送系统消息宣布新客户端:
@Override
public void channelActive(final ChannelHandlerContext context) {
Channel channel = context.channel();
clients.put(channel.id().asShortText(), channel);
history.forEach(channel::writeAndFlush);
handleBroadcast(new Message("system", "client online"), context);
}
Finally, we override channelInactive(), called in the event of a client that went offline. This time, we only need to remove the client from the list and send a system message:
最后,我们覆盖 channelInactive() ,在客户端离线时调用。这一次,我们只需将客户端从列表中删除,并发送一条系统消息:
@Override
public void channelInactive(ChannelHandlerContext context) {
Channel channel = context.channel();
clients.remove(channel.id().asShortText());
handleBroadcast(new Message("system", "client offline"), context);
}
4. Server Bootstrap App
4. 服务器引导应用程序
Our handler does nothing independently, so we need an application to bootstrap and run it, which is a common template.
我们的处理程序不会独立执行任何操作,因此我们需要一个应用程序来bootstrap并运行它,这是一个常见的模板。
4.1. Registering Custom Components in the ChannelPipeline
4.1.在 ChannelPipeline 中注册自定义组件</em
To prepare the bootstrap, we select a channel implementation and implement a child handler, which serves the requests for the channel:
为了准备引导,我们选择了一个通道实现,并实现了一个子处理程序,用于处理通道请求:
bootstrap.group(serverGroup, clientGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) {
channel.pipeline()
.addFirst(
new StringDecoder(),
new ServerEventHandler()
new StringEncoder());
}
});
In the child handler, we define our processing pipeline. Since we’re only concerned about String messages, we’ll use the built-in String encoder and decoder, saving us some time by not having to encode/decode the exchanged byte buffers ourselves.
在子处理程序中,我们定义了处理管道。由于我们只关注 String 消息,因此我们将使用内置的 String 编码器和解码器,这样我们就不必自己对交换的字节缓冲区进行编码/解码,从而节省了一些时间。
Lastly, since the order matters, we add the decoder, our ServerEventHandler, and the encoder. That’s because events flow through the pipeline from inbound to outbound.
最后,由于顺序很重要,我们添加了解码器、ServerEventHandler 和编码器。这是因为事件会从入站流向出站。
We’ll bind our server to a host/port to finish our app, which returns a ChannelFuture. We’ll use this to wait until our async socket is closed with sync():
我们将把服务器绑定到一个主机/端口来完成应用程序,它将返回一个 ChannelFuture 。我们将使用它来等待使用 sync() 关闭异步套接字:
ChannelFuture future = bootstrap.bind(HOST, PORT).sync();
System.out.println("server started. accepting clients.");
future.channel().closeFuture().sync();
5. Client Bootstrap App
5.客户 Bootstrap 应用程序
Finally, our client app follows a common client template for bootstrapping. Most importantly, when calling handler(), we’ll use our ClientEventHandler instead:
最后,我们的客户端应用程序遵循 bootstrapping 的通用客户端模板。最重要的是,在调用 handler() 时,我们将使用我们的 ClientEventHandler 代替:
channel.pipeline().addFirst(
new StringDecoder(),
new ClientEventHandler(),
new StringEncoder());
5.1. Handling Message Input
5.1 处理信息输入
Finally, to handle user input, after connecting to the server, we’ll loop with a Scanner until we receive a user name, and then, until the message equals “exit.” Most importantly, we must use writeAndFlush() to send our message. We send the message in the format expected by Message.parse():
最后,为了处理用户输入,在连接到服务器后,我们将循环使用 Scanner 直到收到用户名,然后,直到消息等于 “exit”。最重要的是,我们必须使用 writeAndFlush() 来发送消息。我们将按照 Message.parse() 所期望的格式发送消息:
private static void messageLoop(Scanner scanner, Channel channel) {
while (user.isEmpty()) {
System.out.print("your name: ");
user = scanner.nextLine();
}
while (scanner.hasNext()) {
System.out.print("> ");
String message = scanner.nextLine();
if (message.equals("exit"))
break;
channel.writeAndFlush(user + ";" + message);
}
}
6. Creating a Custom Event Listener
6.创建自定义事件监听器
In Netty, event listeners play a crucial role in handling asynchronous events throughout the lifecycle of channels. An event listener is essentially a callback mechanism that we can use to react to the completion of any operation that returns a ChannelFuture.
在 Netty 中,事件监听器在整个通道生命周期中处理异步事件时发挥着至关重要的作用。事件监听器本质上是一种回调机制,我们可以使用它对任何返回 ChannelFuture 的操作的完成作出反应。
We implement the ChannelFutureListener interface for custom behavior upon its completion. A ChannelFuture represents the result of an asynchronous operation, such as a connection attempt or an I/O operation.
我们实现了 ChannelFutureListener 接口,以便在其完成时自定义行为。ChannelFuture 表示异步操作的结果,例如连接尝试或 I/O 操作。
ChannelFutureListener is useful because it defines default implementations like CLOSE_ON_FAILURE or FIRE_EXCEPTION_ON_FAILURE. But, since we won’t use these, let’s implement a GenericFutureListener that we’ll use for operation confirmations.
ChannelFutureListener 非常有用,因为它定义了默认实现,如 CLOSE_ON_FAILURE 或 FIRE_EXCEPTION_ON_FAILURE 。但是,由于我们不会使用这些实现,因此让我们来实现一个 GenericFutureListener 来进行操作确认。
We’ll hold a custom event name for context, and we’ll check if our future is completed successfully. Otherwise, we’ll mark the status as “FAILED” before logging:
我们将为上下文保留一个自定义事件名称,并检查我们的未来是否已成功完成。否则,我们将在记录之前将状态标记为“FAILED”:
public class ChannelInfoListener implements GenericFutureListener<ChannelFuture> {
private final String event;
public ChannelInfoListener(String event) {
this.event = event;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
String status = "OK";
if (!future.isSuccess()) {
status = "FAILED";
future.cause().printStackTrace();
}
System.out.printf(
"%s - channel#%s %s: %s%n", Instant.now(), channel.id().asShortText(), status, event);
}
}
6.1. Event Receipts
6.1.活动收据
Let’s return to some parts of our code to include listeners. First, for the client, let’s include a “connected to server” confirmation:
让我们回到代码的某些部分,加入监听器。首先,对于客户端,让我们加入 “已连接服务器 “确认:
future.addListener(new ChannelInfoListener("connected to server"));
Then, let’s include a “message sent” confirmation in the message loop:
然后,让我们在消息循环中加入 “消息已发送 “确认:
ChannelFuture sent = channel.writeAndFlush(user + ";" + message);
sent.addListener(new ChannelInfoListener("message sent"));
This allows us to ensure we’re still connected to the server when sending messages. Finally, for the server handler, let’s send a “message relayed” confirmation during the broadcast:
这样,我们就能确保在发送消息时仍连接到服务器。最后,对于服务器处理程序,让我们在广播期间发送 “信息已中继 “确认:
clients.forEach((id, channel) -> {
if (!id.equals(channelId)) {
ChannelFuture relay = channel.writeAndFlush(message.toString());
relay.addListener(new ChannelInfoListener("message relayed to " + id));
}
});
7. Seeing It in Action
7.亲身体验
Netty allows us to test pipelines with EmbeddedChannel, but for client/server interactions, let’s see what it looks like when running from the terminal. Let’s start the server (we’ll omit package names for readability):
Netty 允许我们使用 EmbeddedChannel 测试管道,但对于客户端/服务器交互,让我们来看看从终端运行时是什么样子。让我们启动服务器(为便于阅读,我们将省略软件包名称):
$ mvn exec:java -Dexec.mainClass=ChatServerMain
chat server started. ready to accept clients.
Then, let’s start the first client, input a name, and send two messages:
然后,让我们启动第一个客户端,输入名称并发送两条信息:
$ mvn exec:java -Dexec.mainClass=ChatClientMain
2024-01-12 3:47:02 - channel#03c40ad4 OK: connected to server
your name: Bob
> Hello
2024-01-12 3:47:02 - channel#03c40ad4 OK: message sent
> Anyone there?!
2024-01-12 3:47:03 - channel#03c40ad4 OK: message sent
When we connect with a second client, we’ll get the message history before inputting a name:
当我们与第二个客户端连接时,我们将在输入名称之前获得消息历史记录:
$ mvn exec:java -Dexec.mainClass=ChatClientMain
2024-01-12 3:49:33 - channel#daa64476 OK: connected to server
2024-01-12 3:46:55 - system: client online: 03c40ad4
2024-01-12 3:47:03 - Bob: Hello
2024-01-12 3:48:40 - Bob: Anyone there?!
Naturally, after choosing a name and sending a message:
当然,在选择了名字并发送了信息之后:
your name: Alice
> Hi, Bob!
2024-01-12 3:51:05 - channel#daa64476 OK: message sent
The first client will receive it:
第一个客户将收到它:
2024-01-12 3:49:33 - system: client online: daa64476
2024-01-12 3:51:05 - Alice: Hi, Bob!
8. Conclusion
8.结论
In this article, we successfully built a functional chat server using Netty, demonstrating the power and simplicity of this framework in handling asynchronous communication. Through implementing event handlers, we managed to relay messages among connected clients and maintain context history.
在本文中,我们使用 Netty 成功构建了一个功能强大的聊天服务器,展示了该框架在处理异步通信方面的强大功能和简易性。通过实现事件处理程序,我们成功地在连接的客户端之间转发了消息,并维护了上下文历史记录。
As always, the source code is available over on GitHub.
与往常一样,源代码可在 GitHub 上获取。