黏包
在 Netty 中向通道写入并刷新一段数据,并不意味着客户端会立即将数据打包发送给服务端,因为底层的 TCP/IP 协议为了优化数据传输,往往会在数据量较小时,将多个数据包合并成一个数据包进行发送,或者在数量较大时,拆分成多个数据包进行发送。因此服务端收到数据包并使用 ByteBuf 读取数据时,会读到不完整或连在一起的多次发送的数据。这就是所谓的黏包/半包问题。
解决黏包/半包通常有这么几种方式:
-
用换行符分隔
-
用指定特殊字符分隔
-
用指定长度的字节数组传输数据
-
使用一个4字节整型指定数据体长度
Netty 已经预定义了相应的编码器/解码器,因此我们不需要自行实现相应的 Handler。
为了模拟黏包的发生,这里使用一个简单的时间服务器,正常情况下客户端发送一条"QUERY TIME ORDER"指令,服务端收到后返回时间。
为了模拟黏包,在客户端循环多次发送:
public class ClientHandler extends ChannelInboundHandlerAdapter {
private int receiveCounter = 0;
private int sendCounter = 0;
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 连接建立后发送时间请求
for (int i = 0; i < 100; i++) {
String request = "QUERY TIME ORDER";
byte[] bytes = request.getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = Unpooled.buffer(bytes.length);
buffer.writeBytes(bytes);
sendCounter++;
log.debug("Send times:{}", sendCounter);
ctx.writeAndFlush(buffer);
}
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
receiveCounter++;
String body = (String) msg;
log.info("Now time is:{}, receive times:{}", body, receiveCounter);
}
}
服务端接收到命令QUERY TIME ORDER后,返回时间:
public class ChildChannelHandler extends ChannelInboundHandlerAdapter {
private int counter = 0;
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 因为添加了解码器,这里直接可以读取到 String
String body = (String) msg;
log.info("The time server receive order:{}", body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
counter++;
log.info("The time server send:{}, receive times:{}", currentTime, counter);
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
log.error("exceptionCaught", cause);
}
}
运行服务端和客户端会发现客户端输出:
2026-05-06 17:11:18.741 [nioEventLoopGroup-2-1] INFO c.i.n.t.client.handler.ClientHandler - Now time is:BAD ORDER, receive times:1 2026-05-06 17:11:18.741 [nioEventLoopGroup-2-1] INFO c.i.n.t.client.handler.ClientHandler - Now time is:BAD ORDER, receive times:2
服务端输出:
2026-05-06 17:11:18.727 [nioEventLoopGroup-3-1] INFO c.i.n.t.s.h.ChildChannelHandler - The time server receive order:QUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ... 2026-05-06 17:11:18.729 [nioEventLoopGroup-3-1] INFO c.i.n.t.s.h.ChildChannelHandler - The time server send:BAD ORDER, receive times:1 2026-05-06 17:11:18.737 [nioEventLoopGroup-3-1] INFO c.i.n.t.s.h.ChildChannelHandler - The time server receive order:QUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ORDERQUERY TIME ... 2026-05-06 17:11:18.737 [nioEventLoopGroup-3-1] INFO c.i.n.t.s.h.ChildChannelHandler - The time server send:BAD ORDER, receive times:2
原因是尽管客户端每次写入 ByteBuf 后立即执行writeAndFlush,但是在底层的 TCP/IP 协议中,依然会缓存多个数据块后合并传输,而在服务端,会一次性接收到一个大的 TCP 数据块并读取,所以就出现上面这种服务端一次读取,但内容是客户端多次发送的结果。
可以用上面说过的几种方式解决黏包,并且这些方式都有 Netty 提供的现成实现,不需要我们手动编写。
DelimiterBasedFrameDecoder
使用DelimiterBasedFrameDecoder可以实现按照指定分隔符对传入的数据进行分割:
// 使用分隔符解码器解决黏包问题
ByteBuf delimiter = Unpooled.wrappedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChildChannelHandler());
这里的
StringDecoder同样是一个解码器,用于将二进制数据转换成字符串,使用它可以方便的在 Handler 中直接处理字符串,不再需要手动转换。
发送的时候(无论是服务端还是客户端),都需要每次发送数据时追加分隔符:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 因为添加了解码器,这里直接可以读取到 String
String body = (String) msg;
log.info("The time server receive order:{}", body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
counter++;
log.info("The time server send:{}, receive times:{}", currentTime, counter);
// 每条消息后追加换行符
currentTime += "$_";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
现在再执行测试就会发现可以正常发送和接收数据了。
LineBasedFrameDecoder
LineBasedFrameDecoder可以看作是特殊的DelimiterBasedFrameDecoder,它以换行符进行分隔。
添加LineBasedFrameDecoder:
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChildChannelHandler());
每次发送数据时候追加换行符:
currentTime += "\r\n";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
换行符可以是
\r\n或\n。
LengthFieldBasedFrameDecoder
LengthFieldBasedFrameDecoder是最常用的解决黏包问题的解码器,需要与对应的编码器LengthFieldPrepender共同使用。在数据发送时,会先用若干字节表示发送内容的字节长度,其后是若干字节的发送内容。数据接收时,会先读取指定字节作为接收数据的字节长度,再读取对应长度的字节。
// 使用长度解码器解决黏包问题
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChildChannelHandler());
因为发送时候的编码是LengthFieldPrepender实现的,因此发送时不需要手动实现。
使用定长帧发送和接收数据解决黏包并不常见,这里不做说明。
序列化
在实际开发中,客户端和服务端之间仅传输字符串是不方便的,如果要传输 Java 对象,就需要用到序列化和反序列化。
最常见的 Java 序列化和反序列化技术是 JDK 的序列化和 JSON。但这两者都一些问题,对于 JDK 序列化,它的性能一般,且序列化后的字节码体积偏大。并且作为一个私有协议,不能跨语言协作。JSON 的优点是可以跨语言,可读性好,但显然序列化后的体积更大,不利于传输。
这里介绍几种可以替代上面两种的序列化技术。
在介绍具体的序列化技术前,先简单说明用于测试的 Netty 示例程序。
为了能够将序列化后的对象反序列化,我们需要获取到对象的完整类名,因此所有序列化的对象都包含在一个基础对象中进行传输:
public class ObjectMessage implements Serializable {
/**
* 将对象序列化后的二进制数组
*/
private byte[] object;
/**
* 对象的完整类名,用于反序列化
*/
private String className;
}
相应的,需要先定义编码器/解码器将二进制转换为 ObjectMessage 或相反:
public class ObjectMessageToByteEncoder extends MessageToByteEncoder<ObjectMessage> {
private final ISerializer serializer;
protected void encode(ChannelHandlerContext ctx, ObjectMessage msg, ByteBuf out) throws Exception {
log.debug("JbossMarshallingEncoder 开始编码,className: {}", msg.getClassName());
byte[] bytes = serializer.serialize(msg);
out.writeBytes(bytes);
log.debug("JbossMarshallingEncoder 编码成功,写入 {} 字节", bytes.length);
}
}
public class ByteToObjectMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
private final ISerializer serializer;
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
int length = msg.readableBytes();
log.debug("JbossMarshallingDecoder 接收到 {} 字节的数据", length);
byte[] array = new byte[length];
msg.readBytes(array);
ObjectMessage objectMessage = serializer.deserialize(array, ObjectMessage.class);
log.debug("JbossMarshallingDecoder 解码成功,className: {}", objectMessage.getClassName());
out.add(objectMessage);
}
}
这里的ISerializer是具体序列化实现的自定义接口:
public interface ISerializer {
byte[] serialize(Object obj) throws Exception;
<T> T deserialize(byte[] bytes, Class<T> clazz) throws Exception;
}
得到承载具体业务对象的 ObjectMessage 后,还需要进行解析以获取具体业务对象:
public class ObjectMessageToObjectDecoder extends MessageToMessageDecoder<ObjectMessage> {
private final ISerializer serializer;
protected void decode(ChannelHandlerContext ctx, ObjectMessage msg, List<Object> out) throws Exception {
String className = msg.getClassName();
log.debug("ObjectDecoder 开始解码,className: {}", className);
Class<?> clazz = Class.forName(className);
// 使用 JbossMarshallingUtil 而不是 ProtobufUtil
Object result = serializer.deserialize(msg.getObject(), clazz);
log.debug("ObjectDecoder 解码成功,对象: {}", result);
out.add(result);
}
}
public class ObjectToObjectMessageEncoder extends MessageToMessageEncoder<Object> {
private final ISerializer serializer;
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
log.debug("ObjectEncoder 开始编码,对象类型: {}, 对象: {}", msg.getClass().getName(), msg);
ObjectMessage message = new ObjectMessage();
message.setObject(serializer.serialize(msg));
message.setClassName(msg.getClass().getName());
log.debug("ObjectEncoder 编码成功,className: {}", message.getClassName());
out.add(message);
}
}
当然,上面说的这些编码/解码过程没有涉及粘包/半包处理,因此还需要加上相应的编码器/解码器。在这个示例中我使用的是LengthFieldBasedFrameDecoder。
所有的这些完整过程可以表示为:
添加长度头、分隔符
完整示例代码可以从获取。
Protobuf
Protobuf 是 Google 开源的一种对象序列化工具,支持多种语言,且性能优异,序列化后的字节码体积也较小。
添加依赖:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>4.34.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.8.0</version>
</dependency>
添加用于序列化的工具类:
public class ProtobufUtil {
/**
* 序列化:对象 → byte[]
*/
public static <T> byte[] encode(T obj) {
if (obj == null) {
return new byte[0];
}
Schema<T> schema = (Schema<T>) RuntimeSchema.getSchema(obj.getClass());
LinkedBuffer buffer = LinkedBuffer.allocate();
try {
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} finally {
buffer.clear();
}
}
/**
* 反序列化:byte[] → 对象
*/
public static <T> T decode(byte[] data, Class<T> clazz) {
if (data == null || data.length == 0) {
return null;
}
Schema<T> schema = RuntimeSchema.getSchema(clazz);
T message = schema.newMessage();
ProtostuffIOUtil.mergeFrom(data, message, schema);
return message;
}
}
Jboss Marshalling
Jboss Marshalling 更像是一个取代 JDK 序列化的工具,仅支持 Java 的序列化,优点是性能和序列化后的字节体积。使用 Jboss 的方式和 JDK 序列化是类似的,要求目标类型实现Serializable接口。
添加依赖:
<!-- JBoss Marshalling 核心 -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>2.0.12.Final</version>
</dependency>
<!-- 两种协议选一种:river(默认,压缩好)或 serial(兼容Java序列化) -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-river</artifactId>
<version>2.0.12.Final</version>
</dependency>
工具类:
public class JbossMarshallingUtil {
// 单例工厂(river 协议)
private static final MarshallerFactory FACTORY = Marshalling.getProvidedMarshallerFactory("river");
// 配置:使用默认版本(不设置 version),缓存类/实例数
private static final MarshallingConfiguration CONFIG;
static {
CONFIG = new MarshallingConfiguration();
// 不设置 version,使用默认值
CONFIG.setClassCount(100);
CONFIG.setInstanceCount(1000);
}
/**
* 序列化:对象 → byte[]
*/
public static <T> byte[] encode(T obj) throws IOException {
if (obj == null) return new byte[0];
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
Marshaller marshaller = FACTORY.createMarshaller(CONFIG)) {
marshaller.start(Marshalling.createByteOutput(baos));
marshaller.writeObject(obj);
marshaller.finish();
return baos.toByteArray();
}
}
/**
* 反序列化:byte[] → 对象
*/
public static <T> T decode(byte[] data, Class<T> clazz) throws IOException, ClassNotFoundException {
if (data == null || data.length == 0) return null;
try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
Unmarshaller unmarshaller = FACTORY.createUnmarshaller(CONFIG)) {
unmarshaller.start(Marshalling.createByteInput(bais));
T obj = unmarshaller.readObject(clazz);
unmarshaller.finish();
return obj;
}
}
}
MessagePack
MessagePack 可以取代 Json,他序列化后的内容更紧凑,可以节约字节数。
MessagePack 需要和 JSON 解析器协同工作,通常是使用 Jackson,因此要添加依赖:
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
<version>0.9.11</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>jackson-dataformat-msgpack</artifactId>
<version>0.9.11</version>
</dependency>
<!-- Java 8 日期时间支持 -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.18.2</version>
</dependency>
<!-- 统一管理 Jackson 版本 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.18.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.18.2</version>
</dependency>
工具类:
public class MsgPackUtil {
private static final ObjectMapper OBJECT_MAPPER;
static {
OBJECT_MAPPER = new ObjectMapper(new MessagePackFactory());
// 注册 Java 8 日期时间模块
OBJECT_MAPPER.registerModule(new JavaTimeModule());
// 配置 BigDecimal 序列化为普通字符串格式,避免科学计数法
OBJECT_MAPPER.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true);
}
public static byte[] encode(Object obj) throws Exception {
return OBJECT_MAPPER.writeValueAsBytes(obj);
}
public static <T> T decode(byte[] bytes, Class<T> clazz) throws Exception {
return OBJECT_MAPPER.readValue(bytes, clazz);
}
}
The End.
本文的完整示例代码可以从获取。

文章评论