红茶的个人站点

  • 首页
  • 专栏
  • 开发工具
  • 其它
  • 隐私政策
Awalon
Talk is cheap,show me the code.
  1. 首页
  2. 开发工具
  3. 正文

Netty 学习笔记 3:编解码器

2026年5月12日 10点热度 0人点赞 0条评论

黏包

在 Netty 中向通道写入并刷新一段数据,并不意味着客户端会立即将数据打包发送给服务端,因为底层的 TCP/IP 协议为了优化数据传输,往往会在数据量较小时,将多个数据包合并成一个数据包进行发送,或者在数量较大时,拆分成多个数据包进行发送。因此服务端收到数据包并使用 ByteBuf 读取数据时,会读到不完整或连在一起的多次发送的数据。这就是所谓的黏包/半包问题。

解决黏包/半包通常有这么几种方式:

  • 用换行符分隔

  • 用指定特殊字符分隔

  • 用指定长度的字节数组传输数据

  • 使用一个4字节整型指定数据体长度

Netty 已经预定义了相应的编码器/解码器,因此我们不需要自行实现相应的 Handler。

为了模拟黏包的发生,这里使用一个简单的时间服务器,正常情况下客户端发送一条"QUERY TIME ORDER"指令,服务端收到后返回时间。

为了模拟黏包,在客户端循环多次发送:

@Slf4j
public class ClientHandler extends ChannelInboundHandlerAdapter {
    private int receiveCounter = 0;
    private int sendCounter = 0;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 连接建立后发送时间请求
        for (int i = 0; i < 100; i++) {
            String request = "QUERY TIME ORDER";
            byte[] bytes = request.getBytes(StandardCharsets.UTF_8);
            ByteBuf buffer = Unpooled.buffer(bytes.length);
            buffer.writeBytes(bytes);
            sendCounter++;
            log.debug("Send times:{}", sendCounter);
            ctx.writeAndFlush(buffer);
        }
    }
​
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        receiveCounter++;
        String body = (String) msg;
        log.info("Now time is:{}, receive times:{}", body, receiveCounter);
    }
}

服务端接收到命令QUERY TIME ORDER后,返回时间:

@Slf4j
public class ChildChannelHandler extends ChannelInboundHandlerAdapter {
    private int counter = 0;
​
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 因为添加了解码器,这里直接可以读取到 String
        String body = (String) msg;
        log.info("The time server receive order:{}", body);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
                new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
        counter++;
        log.info("The time server send:{}, receive times:{}", currentTime, counter);
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }
​
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        log.error("exceptionCaught", cause);
    }
}

运行服务端和客户端会发现客户端输出:

2026-05-06 17:11:18.741 [nioEventLoopGroup-2-1] INFO  c.i.n.t.client.handler.ClientHandler - Now time is:BAD ORDER, receive times:1
2026-05-06 17:11:18.741 [nioEventLoopGroup-2-1] INFO  c.i.n.t.client.handler.ClientHandler - Now time is:BAD ORDER, receive times:2

服务端输出:

2026-05-06 17:11:18.727 [nioEventLoopGroup-3-1] INFO  c.i.n.t.s.h.ChildChannelHandler - The time server receive order:QUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ...
2026-05-06 17:11:18.729 [nioEventLoopGroup-3-1] INFO  c.i.n.t.s.h.ChildChannelHandler - The time server send:BAD ORDER, receive times:1
2026-05-06 17:11:18.737 [nioEventLoopGroup-3-1] INFO  c.i.n.t.s.h.ChildChannelHandler - The time server receive order:QUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ...
2026-05-06 17:11:18.737 [nioEventLoopGroup-3-1] INFO  c.i.n.t.s.h.ChildChannelHandler - The time server send:BAD ORDER, receive times:2

原因是尽管客户端每次写入 ByteBuf 后立即执行writeAndFlush,但是在底层的 TCP/IP 协议中,依然会缓存多个数据块后合并传输,而在服务端,会一次性接收到一个大的 TCP 数据块并读取,所以就出现上面这种服务端一次读取,但内容是客户端多次发送的结果。

可以用上面说过的几种方式解决黏包,并且这些方式都有 Netty 提供的现成实现,不需要我们手动编写。

DelimiterBasedFrameDecoder

使用DelimiterBasedFrameDecoder可以实现按照指定分隔符对传入的数据进行分割:

// 使用分隔符解码器解决黏包问题
ByteBuf delimiter = Unpooled.wrappedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChildChannelHandler());

这里的StringDecoder同样是一个解码器,用于将二进制数据转换成字符串,使用它可以方便的在 Handler 中直接处理字符串,不再需要手动转换。

发送的时候(无论是服务端还是客户端),都需要每次发送数据时追加分隔符:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 因为添加了解码器,这里直接可以读取到 String
    String body = (String) msg;
    log.info("The time server receive order:{}", body);
    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
        new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
    counter++;
    log.info("The time server send:{}, receive times:{}", currentTime, counter);
    // 每条消息后追加换行符
    currentTime += "$_";
    ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
    ctx.writeAndFlush(resp);
}

现在再执行测试就会发现可以正常发送和接收数据了。

LineBasedFrameDecoder

LineBasedFrameDecoder可以看作是特殊的DelimiterBasedFrameDecoder,它以换行符进行分隔。

添加LineBasedFrameDecoder:

ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChildChannelHandler());

每次发送数据时候追加换行符:

currentTime += "\r\n";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);

换行符可以是\r\n或\n。

LengthFieldBasedFrameDecoder

LengthFieldBasedFrameDecoder是最常用的解决黏包问题的解码器,需要与对应的编码器LengthFieldPrepender共同使用。在数据发送时,会先用若干字节表示发送内容的字节长度,其后是若干字节的发送内容。数据接收时,会先读取指定字节作为接收数据的字节长度,再读取对应长度的字节。

// 使用长度解码器解决黏包问题
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChildChannelHandler());

因为发送时候的编码是LengthFieldPrepender实现的,因此发送时不需要手动实现。

使用定长帧发送和接收数据解决黏包并不常见,这里不做说明。

序列化

在实际开发中,客户端和服务端之间仅传输字符串是不方便的,如果要传输 Java 对象,就需要用到序列化和反序列化。

最常见的 Java 序列化和反序列化技术是 JDK 的序列化和 JSON。但这两者都一些问题,对于 JDK 序列化,它的性能一般,且序列化后的字节码体积偏大。并且作为一个私有协议,不能跨语言协作。JSON 的优点是可以跨语言,可读性好,但显然序列化后的体积更大,不利于传输。

这里介绍几种可以替代上面两种的序列化技术。

在介绍具体的序列化技术前,先简单说明用于测试的 Netty 示例程序。

为了能够将序列化后的对象反序列化,我们需要获取到对象的完整类名,因此所有序列化的对象都包含在一个基础对象中进行传输:

@Data
public class ObjectMessage implements Serializable {
    /**
     * 将对象序列化后的二进制数组
     */
    private byte[] object;
    /**
     * 对象的完整类名,用于反序列化
     */
    private String className;
}

相应的,需要先定义编码器/解码器将二进制转换为 ObjectMessage 或相反:

@Slf4j
@AllArgsConstructor
public class ObjectMessageToByteEncoder extends MessageToByteEncoder<ObjectMessage> {
    private final ISerializer serializer;
    @Override
    protected void encode(ChannelHandlerContext ctx, ObjectMessage msg, ByteBuf out) throws Exception {
        log.debug("JbossMarshallingEncoder 开始编码,className: {}", msg.getClassName());
        byte[] bytes = serializer.serialize(msg);
        out.writeBytes(bytes);
        log.debug("JbossMarshallingEncoder 编码成功,写入 {} 字节", bytes.length);
    }
}
@Slf4j
@AllArgsConstructor
public class ByteToObjectMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
    private final ISerializer serializer;
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        int length = msg.readableBytes();
        log.debug("JbossMarshallingDecoder 接收到 {} 字节的数据", length);
        byte[] array = new byte[length];
        msg.readBytes(array);
        ObjectMessage objectMessage = serializer.deserialize(array, ObjectMessage.class);
        log.debug("JbossMarshallingDecoder 解码成功,className: {}", objectMessage.getClassName());
        out.add(objectMessage);
    }
}

这里的ISerializer是具体序列化实现的自定义接口:

public interface ISerializer {
    byte[] serialize(Object obj) throws Exception;
    <T> T deserialize(byte[] bytes, Class<T> clazz) throws Exception;
}

得到承载具体业务对象的 ObjectMessage 后,还需要进行解析以获取具体业务对象:

@Slf4j
@AllArgsConstructor
public class ObjectMessageToObjectDecoder extends MessageToMessageDecoder<ObjectMessage> {
    private final ISerializer serializer;
    @Override
    protected void decode(ChannelHandlerContext ctx, ObjectMessage msg, List<Object> out) throws Exception {
        String className = msg.getClassName();
        log.debug("ObjectDecoder 开始解码,className: {}", className);
        Class<?> clazz = Class.forName(className);
        // 使用 JbossMarshallingUtil 而不是 ProtobufUtil
        Object result = serializer.deserialize(msg.getObject(), clazz);
        log.debug("ObjectDecoder 解码成功,对象: {}", result);
        out.add(result);
    }
}
@Slf4j
@AllArgsConstructor
public class ObjectToObjectMessageEncoder extends MessageToMessageEncoder<Object> {
    private final ISerializer serializer;
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
        log.debug("ObjectEncoder 开始编码,对象类型: {}, 对象: {}", msg.getClass().getName(), msg);
        ObjectMessage message = new ObjectMessage();
        message.setObject(serializer.serialize(msg));
        message.setClassName(msg.getClass().getName());
        log.debug("ObjectEncoder 编码成功,className: {}", message.getClassName());
        out.add(message);
    }
}

当然,上面说的这些编码/解码过程没有涉及粘包/半包处理,因此还需要加上相应的编码器/解码器。在这个示例中我使用的是LengthFieldBasedFrameDecoder。

所有的这些完整过程可以表示为:

业务Object 对象
ObjectToObjectMessageEncoder
ObjectMessage 对象
ObjectMessageToByteEncoder
原始二进制数据包
LengthFieldPrepender
添加长度头、分隔符
带长度帧的完整二进制数据包
网络发送

完整示例代码可以从这里获取。

Protobuf

Protobuf 是 Google 开源的一种对象序列化工具,支持多种语言,且性能优异,序列化后的字节码体积也较小。

添加依赖:

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>4.34.1</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.8.0</version>
</dependency>
<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.8.0</version>
</dependency>

添加用于序列化的工具类:

public class ProtobufUtil {
​
    /**
     * 序列化:对象 → byte[]
     */
    public static <T> byte[] encode(T obj) {
        if (obj == null) {
            return new byte[0];
        }
        Schema<T> schema = (Schema<T>) RuntimeSchema.getSchema(obj.getClass());
        LinkedBuffer buffer = LinkedBuffer.allocate();
        try {
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } finally {
            buffer.clear();
        }
    }
​
    /**
     * 反序列化:byte[] → 对象
     */
    public static <T> T decode(byte[] data, Class<T> clazz) {
        if (data == null || data.length == 0) {
            return null;
        }
        Schema<T> schema = RuntimeSchema.getSchema(clazz);
        T message = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(data, message, schema);
        return message;
    }
}

Jboss Marshalling

Jboss Marshalling 更像是一个取代 JDK 序列化的工具,仅支持 Java 的序列化,优点是性能和序列化后的字节体积。使用 Jboss 的方式和 JDK 序列化是类似的,要求目标类型实现Serializable接口。

添加依赖:

<!-- JBoss Marshalling 核心 -->
<dependency>
    <groupId>org.jboss.marshalling</groupId>
    <artifactId>jboss-marshalling</artifactId>
    <version>2.0.12.Final</version>
</dependency>
​
<!-- 两种协议选一种:river(默认,压缩好)或 serial(兼容Java序列化) -->
<dependency>
    <groupId>org.jboss.marshalling</groupId>
    <artifactId>jboss-marshalling-river</artifactId>
    <version>2.0.12.Final</version>
</dependency>

工具类:

public class JbossMarshallingUtil {
    // 单例工厂(river 协议)
    private static final MarshallerFactory FACTORY = Marshalling.getProvidedMarshallerFactory("river");
    // 配置:使用默认版本(不设置 version),缓存类/实例数
    private static final MarshallingConfiguration CONFIG;
​
    static {
        CONFIG = new MarshallingConfiguration();
        // 不设置 version,使用默认值
        CONFIG.setClassCount(100);
        CONFIG.setInstanceCount(1000);
    }
​
    /**
     * 序列化:对象 → byte[]
     */
    public static <T> byte[] encode(T obj) throws IOException {
        if (obj == null) return new byte[0];
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
             Marshaller marshaller = FACTORY.createMarshaller(CONFIG)) {
            marshaller.start(Marshalling.createByteOutput(baos));
            marshaller.writeObject(obj);
            marshaller.finish();
            return baos.toByteArray();
        }
    }
​
    /**
     * 反序列化:byte[] → 对象
     */
    public static <T> T decode(byte[] data, Class<T> clazz) throws IOException, ClassNotFoundException {
        if (data == null || data.length == 0) return null;
        try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
             Unmarshaller unmarshaller = FACTORY.createUnmarshaller(CONFIG)) {
            unmarshaller.start(Marshalling.createByteInput(bais));
            T obj = unmarshaller.readObject(clazz);
            unmarshaller.finish();
            return obj;
        }
    }
}

MessagePack

MessagePack 可以取代 Json,他序列化后的内容更紧凑,可以节约字节数。

MessagePack 需要和 JSON 解析器协同工作,通常是使用 Jackson,因此要添加依赖:

<dependency>
    <groupId>org.msgpack</groupId>
    <artifactId>msgpack-core</artifactId>
    <version>0.9.11</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>org.msgpack</groupId>
    <artifactId>jackson-dataformat-msgpack</artifactId>
    <version>0.9.11</version>
</dependency>
<!-- Java 8 日期时间支持 -->
<dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
    <version>2.18.2</version>
</dependency>
<!-- 统一管理 Jackson 版本 -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.18.2</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.18.2</version>
</dependency>

工具类:

public class MsgPackUtil {
    private static final ObjectMapper OBJECT_MAPPER;
    
    static {
        OBJECT_MAPPER = new ObjectMapper(new MessagePackFactory());
        // 注册 Java 8 日期时间模块
        OBJECT_MAPPER.registerModule(new JavaTimeModule());
        // 配置 BigDecimal 序列化为普通字符串格式,避免科学计数法
        OBJECT_MAPPER.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true);
    }
​
    public static byte[] encode(Object obj) throws Exception {
        return OBJECT_MAPPER.writeValueAsBytes(obj);
    }
​
    public static  <T> T decode(byte[] bytes, Class<T> clazz) throws Exception {
        return OBJECT_MAPPER.readValue(bytes, clazz);
    }
}

The End.

本文的完整示例代码可以从这里获取。

参考资料

  • 《Netty权威指南》

本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: netty
最后更新:2026年5月12日

魔芋红茶

加一点PHP,加一点Go,加一点Python......

点赞
< 上一篇

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复

COPYRIGHT © 2021 icexmoon.cn. ALL RIGHTS RESERVED.
本网站由提供CDN加速/云存储服务

Theme Kratos Made By Seaton Jiang

宁ICP备2021001508号

宁公网安备64040202000141号