红茶的个人站点

  • 首页
  • 专栏
  • 开发工具
  • 其它
  • 隐私政策
Awalon
Talk is cheap,show me the code.
  1. 首页
  2. 未分类
  3. 正文

Netty 学习笔记 5:测试

2026年5月22日 8点热度 0人点赞 0条评论

EmbeddedChannel

可以使用一个特殊的 EmbeddedChannel 模拟消息的入站和出站操作,在不真实创建连接的情况下测试 ChannelHandler 是否能正常工作。

假设 Netty 的 Pipline 包含这几个常用的 Handler:

  • LengthFieldBasedFrameDecoder,将长度+消息内容的帧读取

  • LengthFieldPrepender,将消息帧添加长度

  • StringDecoder,将字节解码为字符串

  • StringEncoder,将字符串编码为字节

添加EmbeddedChannel定义:

EmbeddedChannel channel = new EmbeddedChannel(
    new LengthFieldBasedFrameDecoder(1024, 0, lengthFieldLength, 0, lengthFieldLength),
    new LengthFieldPrepender(lengthFieldLength),
    new StringDecoder(StandardCharsets.UTF_8),
    new StringEncoder(StandardCharsets.UTF_8)
);

定义的方式与在 BootStrap 中使用 ChannelInilizer 完全相同,Handler 的定义顺序也完全相同。

测试入站:

ByteBuf inboundData = Unpooled.buffer();
inboundData.writeInt(5); // 长度前缀(表示后面有5字节数据)
inboundData.writeBytes("hello".getBytes(StandardCharsets.UTF_8));
​
// 写入入站数据,经过 Pipeline 处理
channel.writeInbound(inboundData);
​
// 读取处理后的结果(应该是解码后的字符串)
String hello = (String) channel.readInbound();
Assertions.assertEquals("hello", hello);

这里通过EmbeddedChannel的writeInbound方法模拟消息入站,这里传入的是ByteBuf类型,这也是 Netty 入站的原始类型。需要先写入一个四字节的长度,再写入消息内容。

可以看到,通过 Channel 的 Pipline 上的入站 Handler 处理后,通过readInbound方法读取入站消息处理后的结果,这里是预期的原始消息hello(不包含长度)。

测试出站:

// ========== 测试出站编码 ==========
// 写入字符串到出站方向
channel.writeOutbound("world");
​
// 读取处理后的结果(应该是带长度前缀的 ByteBuf)
ByteBuf outboundBuffer1 = (ByteBuf) channel.readOutbound();
Assertions.assertEquals(4, outboundBuffer1.readableBytes());
Assertions.assertEquals(5, outboundBuffer1.readInt());
ByteBuf outboundBuffer2 = (ByteBuf) channel.readOutbound();
String string = outboundBuffer2.toString(StandardCharsets.UTF_8);
Assertions.assertEquals("world", string);
​
// 释放资源
channel.finish();
outboundBuffer1.release();
outboundBuffer2.release();

出站时通过writeOutbound方法写入的消息是字符串,经过出站处理器处理后的最终消息是长度+消息内容。

需要注意的是,Netty 官方实现的LengthFieldPrepender,是将长度和原始消息分别写入 Pipline,而不是通常我们以为的一次性写入包含长度和消息的帧。最终的二进制流来看两者结果是一样的,但在这个测试用例中略有不同,因为是两次写入两个帧,因此读取时也要分两次分别读取长度和原始内容。

异常处理

一个自定义解码器,用于将输入的字节流切分成指定大小的固定长度的帧:

public class FixedLengthHandler extends ByteToMessageDecoder {
    private final int length;
    private final int maxLength;
​
    public FixedLengthHandler(int length, int maxLength) {
        if (length <= 0 || maxLength <= 0) {
            throw new IllegalArgumentException("length must be a positive integer");
        }
        this.maxLength = maxLength;
        this.length = length;
    }
​
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int readableBytes = in.readableBytes();
        if (readableBytes > maxLength) {
            throw new Exception("Frame too large");
        }
        while (readableBytes > 0) {
            if (readableBytes >= length) {
                ByteBuf byteBuf = in.readBytes(length);
                out.add(byteBuf);
                readableBytes -= length;
            } else {
                ByteBuf byteBuf = in.readBytes(readableBytes);
                out.add(byteBuf);
                readableBytes = 0;
            }
        }
    }
}

这个解码器可以指定一个输入字节流的最大值,如果过大就抛出异常,可以使用EmbeddedChannel模拟这种情况的发生:

EmbeddedChannel channel = new EmbeddedChannel(
    new FixedLengthHandler(3,10),
    new StringDecoder(StandardCharsets.UTF_8)
);
try{
    channel.writeInbound(Unpooled.copiedBuffer("hello world".getBytes(StandardCharsets.UTF_8)));
    channel.finish();
    Assertions.fail();
}
catch (Exception e){
    Assertions.assertInstanceOf(DecoderException.class, e);
}
String s = (String)channel.readInbound();

这里有一个细节,我们抛出的是Exception类型的异常,但 Netty 将其包装成了DecoderException类型的异常抛出。

处理异常

如果异常不被处理,异常会沿着 Pipline 的入站/出站处理器链向下传递,直到有一个处理器捕获并处理,如果没有任何处理器处理,就直接抛出。

因此,在入站/出站处理器链的末端添加一个全局异常处理器是一个不错的实践,比如:

@Slf4j
public class InExceptionHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error(cause.getMessage(), cause);
        ctx.close();
    }
}

这是一个处理入站异常的处理器,通过重写exceptionCaught方法捕获并记录日常,并关闭连接。

单元测试:

EmbeddedChannel channel = new EmbeddedChannel(
    new FixedLengthHandler(3,10),
    new InExceptionHandler()
);
channel.writeInbound(Unpooled.copiedBuffer("hello world".getBytes(StandardCharsets.UTF_8)));
channel.finish();
String s = (String)channel.readInbound();
System.out.println(s);

这里单元测试代码不会出错,因为InExceptionHandler捕获并记录了异常,不会直接抛出。

出站的异常处理与入站略有不同:

@Slf4j
public class OutExceptionRecorder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        promise.addListener(future -> {
            if (!future.isSuccess()) {
                log.error(future.cause().getMessage(), future.cause());
                ctx.close();
            }
        });
        super.write(ctx, msg, promise);
    }
}

出站的异常需要通过ChannelPromise体现,因此这里需要为ChannelPromise添加监听,以在ChannelPromise产生结果时处理可能的异常。

内存泄漏检查

默认情况下 Netty 的 ByteBuf 使用的是直接内存,好处是效率高,零拷贝。缺点是在某些情况下没有正确释放会导致内存泄漏。

Netty 提供内存泄漏检查工具,可以使用以下 Jvm 参数开启:

-Dio.netty.leakDetection.level=paranoid
-Dio.netty.leakDetection.maxRecords=30

也可以用代码方式开启:

System.setProperty("io.netty.leakDetection.level", "paranoid");
System.setProperty("io.netty.leakDetection.maxRecords", "30");

io.netty.leakDetection.level有多个检查级别,可以在开发环境使用paranoid执行最严格的内存泄漏检查。

可以使用这段代码模拟内存泄漏:

        // 使用Java 21的虚拟线程执行器
        try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
            for (int i = 0; i < 100; i++) {
                final int taskId = i;
                executor.submit(() -> {
                    ByteBufAllocator alloc = ByteBufAllocator.DEFAULT;
                    // 每个虚拟线程创建一些ByteBuf而不释放
                    for (int j = 0; j < 10; j++) {
                        ByteBuf buffer = alloc.directBuffer(256);
                        buffer.writeInt(taskId * 10 + j);
                        // 故意不释放,制造泄漏
                        // buffer.release();
                    }
                    return null;
                });
            }
        }

        // 触发GC
        System.gc();
        Thread.sleep(1500);

可以在日志中看到类似:

2026-05-21 17:47:32.291 [virtual-96] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.

日志还会提示内存泄漏的位置。

作为对比,下面是正确释放 ByteBuf,不会导致内存泄漏检查报警的代码:

ByteBufAllocator alloc = ByteBufAllocator.DEFAULT;

// 创建并正确释放缓冲区
for (int i = 0; i < 1000; i++) {
    ByteBuf buffer = alloc.directBuffer(512);
    buffer.writeInt(i);
    buffer.release(); // 正确释放
}

System.gc();
Thread.sleep(1000);

The End.

本文的完整示例可以从这里获取。

参考资料

  • 《Netty 实战》

本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: netty
最后更新:2026年5月22日

魔芋红茶

加一点PHP,加一点Go,加一点Python......

点赞
< 上一篇

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复

COPYRIGHT © 2021 icexmoon.cn. ALL RIGHTS RESERVED.
本网站由提供CDN加速/云存储服务

Theme Kratos Made By Seaton Jiang

宁ICP备2021001508号

宁公网安备64040202000141号