EmbeddedChannel
可以使用一个特殊的 模拟消息的入站和出站操作,在不真实创建连接的情况下测试 ChannelHandler 是否能正常工作。
假设 Netty 的 Pipline 包含这几个常用的 Handler:
-
LengthFieldBasedFrameDecoder,将长度+消息内容的帧读取 -
LengthFieldPrepender,将消息帧添加长度 -
StringDecoder,将字节解码为字符串 -
StringEncoder,将字符串编码为字节
添加EmbeddedChannel定义:
EmbeddedChannel channel = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(1024, 0, lengthFieldLength, 0, lengthFieldLength),
new LengthFieldPrepender(lengthFieldLength),
new StringDecoder(StandardCharsets.UTF_8),
new StringEncoder(StandardCharsets.UTF_8)
);
定义的方式与在 BootStrap 中使用 ChannelInilizer 完全相同,Handler 的定义顺序也完全相同。
测试入站:
ByteBuf inboundData = Unpooled.buffer();
inboundData.writeInt(5); // 长度前缀(表示后面有5字节数据)
inboundData.writeBytes("hello".getBytes(StandardCharsets.UTF_8));
// 写入入站数据,经过 Pipeline 处理
channel.writeInbound(inboundData);
// 读取处理后的结果(应该是解码后的字符串)
String hello = (String) channel.readInbound();
Assertions.assertEquals("hello", hello);
这里通过EmbeddedChannel的writeInbound方法模拟消息入站,这里传入的是ByteBuf类型,这也是 Netty 入站的原始类型。需要先写入一个四字节的长度,再写入消息内容。
可以看到,通过 Channel 的 Pipline 上的入站 Handler 处理后,通过readInbound方法读取入站消息处理后的结果,这里是预期的原始消息hello(不包含长度)。
测试出站:
// ========== 测试出站编码 ==========
// 写入字符串到出站方向
channel.writeOutbound("world");
// 读取处理后的结果(应该是带长度前缀的 ByteBuf)
ByteBuf outboundBuffer1 = (ByteBuf) channel.readOutbound();
Assertions.assertEquals(4, outboundBuffer1.readableBytes());
Assertions.assertEquals(5, outboundBuffer1.readInt());
ByteBuf outboundBuffer2 = (ByteBuf) channel.readOutbound();
String string = outboundBuffer2.toString(StandardCharsets.UTF_8);
Assertions.assertEquals("world", string);
// 释放资源
channel.finish();
outboundBuffer1.release();
outboundBuffer2.release();
出站时通过writeOutbound方法写入的消息是字符串,经过出站处理器处理后的最终消息是长度+消息内容。
需要注意的是,Netty 官方实现的
LengthFieldPrepender,是将长度和原始消息分别写入 Pipline,而不是通常我们以为的一次性写入包含长度和消息的帧。最终的二进制流来看两者结果是一样的,但在这个测试用例中略有不同,因为是两次写入两个帧,因此读取时也要分两次分别读取长度和原始内容。
异常处理
一个自定义解码器,用于将输入的字节流切分成指定大小的固定长度的帧:
public class FixedLengthHandler extends ByteToMessageDecoder {
private final int length;
private final int maxLength;
public FixedLengthHandler(int length, int maxLength) {
if (length <= 0 || maxLength <= 0) {
throw new IllegalArgumentException("length must be a positive integer");
}
this.maxLength = maxLength;
this.length = length;
}
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int readableBytes = in.readableBytes();
if (readableBytes > maxLength) {
throw new Exception("Frame too large");
}
while (readableBytes > 0) {
if (readableBytes >= length) {
ByteBuf byteBuf = in.readBytes(length);
out.add(byteBuf);
readableBytes -= length;
} else {
ByteBuf byteBuf = in.readBytes(readableBytes);
out.add(byteBuf);
readableBytes = 0;
}
}
}
}
这个解码器可以指定一个输入字节流的最大值,如果过大就抛出异常,可以使用EmbeddedChannel模拟这种情况的发生:
EmbeddedChannel channel = new EmbeddedChannel(
new FixedLengthHandler(3,10),
new StringDecoder(StandardCharsets.UTF_8)
);
try{
channel.writeInbound(Unpooled.copiedBuffer("hello world".getBytes(StandardCharsets.UTF_8)));
channel.finish();
Assertions.fail();
}
catch (Exception e){
Assertions.assertInstanceOf(DecoderException.class, e);
}
String s = (String)channel.readInbound();
这里有一个细节,我们抛出的是Exception类型的异常,但 Netty 将其包装成了DecoderException类型的异常抛出。
处理异常
如果异常不被处理,异常会沿着 Pipline 的入站/出站处理器链向下传递,直到有一个处理器捕获并处理,如果没有任何处理器处理,就直接抛出。
因此,在入站/出站处理器链的末端添加一个全局异常处理器是一个不错的实践,比如:
public class InExceptionHandler extends ChannelInboundHandlerAdapter {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error(cause.getMessage(), cause);
ctx.close();
}
}
这是一个处理入站异常的处理器,通过重写exceptionCaught方法捕获并记录日常,并关闭连接。
单元测试:
EmbeddedChannel channel = new EmbeddedChannel(
new FixedLengthHandler(3,10),
new InExceptionHandler()
);
channel.writeInbound(Unpooled.copiedBuffer("hello world".getBytes(StandardCharsets.UTF_8)));
channel.finish();
String s = (String)channel.readInbound();
System.out.println(s);
这里单元测试代码不会出错,因为InExceptionHandler捕获并记录了异常,不会直接抛出。
出站的异常处理与入站略有不同:
public class OutExceptionRecorder extends ChannelOutboundHandlerAdapter {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
promise.addListener(future -> {
if (!future.isSuccess()) {
log.error(future.cause().getMessage(), future.cause());
ctx.close();
}
});
super.write(ctx, msg, promise);
}
}
出站的异常需要通过ChannelPromise体现,因此这里需要为ChannelPromise添加监听,以在ChannelPromise产生结果时处理可能的异常。
内存泄漏检查
默认情况下 Netty 的 ByteBuf 使用的是直接内存,好处是效率高,零拷贝。缺点是在某些情况下没有正确释放会导致内存泄漏。
Netty 提供内存泄漏检查工具,可以使用以下 Jvm 参数开启:
-Dio.netty.leakDetection.level=paranoid -Dio.netty.leakDetection.maxRecords=30
也可以用代码方式开启:
System.setProperty("io.netty.leakDetection.level", "paranoid");
System.setProperty("io.netty.leakDetection.maxRecords", "30");
io.netty.leakDetection.level有多个检查级别,可以在开发环境使用paranoid执行最严格的内存泄漏检查。
可以使用这段代码模拟内存泄漏:
// 使用Java 21的虚拟线程执行器
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 100; i++) {
final int taskId = i;
executor.submit(() -> {
ByteBufAllocator alloc = ByteBufAllocator.DEFAULT;
// 每个虚拟线程创建一些ByteBuf而不释放
for (int j = 0; j < 10; j++) {
ByteBuf buffer = alloc.directBuffer(256);
buffer.writeInt(taskId * 10 + j);
// 故意不释放,制造泄漏
// buffer.release();
}
return null;
});
}
}
// 触发GC
System.gc();
Thread.sleep(1500);
可以在日志中看到类似:
2026-05-21 17:47:32.291 [virtual-96] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
日志还会提示内存泄漏的位置。
作为对比,下面是正确释放 ByteBuf,不会导致内存泄漏检查报警的代码:
ByteBufAllocator alloc = ByteBufAllocator.DEFAULT;
// 创建并正确释放缓冲区
for (int i = 0; i < 1000; i++) {
ByteBuf buffer = alloc.directBuffer(512);
buffer.writeInt(i);
buffer.release(); // 正确释放
}
System.gc();
Thread.sleep(1000);
The End.
本文的完整示例可以从获取。

文章评论