HTTP
Netty 默认对一些常见协议有良好支持,比如 HTTP 协议,我们可以很容易通过 Netty 创建一个 HTTP 协议访问的文件服务器。
HTTP 服务器的配置与一般的 Netty 服务端配置类似,需要添加以下 Handler:
// 1. HTTP 请求解码器:将字节流解码为 HttpRequest 对象
ch.pipeline().addLast(new HttpRequestDecoder());
// 2. HTTP 对象聚合器:将分片的 HTTP 消息聚合为 FullHttpRequest
// 参数 65536 表示最大允许的内容长度为 64KB
ch.pipeline().addLast(new HttpObjectAggregator(65536));
// 3. HTTP 响应编码器:将 HttpResponse 对象编码为字节流
ch.pipeline().addLast(new HttpResponseEncoder());
// 4. 分块写入处理器:支持大文件的分块传输,避免内存溢出
ch.pipeline().addLast(new ChunkedWriteHandler());
// 5. 自定义业务处理器:处理文件服务器的核心逻辑
ch.pipeline().addLast(new HttpFileServerHandler());
文件服务器的处理逻辑都在HttpFileServerHandler中实现:
public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
/** 文件服务器根目录路径 */
public static final String HTTP_FILE_SERVER_ROOT_DIR = "D:\\download";
/**
* 非法文件名匹配模式
* <p>
* 匹配包含非字母、数字、下划线、连字符、点号的文件名
* 用于过滤不安全或特殊字符的文件名
*/
private static final Pattern INALLOWED_FILE_NAME = Pattern.compile("[^A-Za-z0-9_\\-\\.]");
// ...
}
经过HttpRequestDecoder处理后的 HTTP 请求是FullHttpRequest对象,对应的,返回的是FullHttpResponse对象。
具体的处理逻辑是,如果请求的路径是一个目录就将该目录下的子目录和文件整理成 Html 链接返回。如果是一个文件,就直接将文件的二进制流返回,客户端会触发文件下载。
完整内容过多,这里不做展示,可以查看。
私有协议
除了用 Netty 实现常见协议,还可以用 Netty 实现私有协议,下面是一个示例。
整个协议使用 TCP/IP 进行长连接,使用心跳进行保活,并在建立连接后进行握手认证。
NettyMessage
首先定义一个类型NettyMessage,所有的通信都使用这个类型传输:
public class NettyMessage {
private Header header;
private Object body;
}
header可以用于传输一些固定的内容,比如协议名称、版本、消息类型等。body用于传输业务数据,可以是基本数据类型或者对象。
Header的具体实现:
public class Header {
public static class Type{
public static final byte LOGIN_AUTH_REQ = 0;
public static final byte LOGIN_AUTH_RESP = 1;
public static final byte HEARTBEAT_REQ = 2;
public static final byte HEARTBEAT_RESP = 3;
}
private final int crcCode = 0xabef0101;
/**
* 消息长度
*/
private int length;
/**
* 会话ID
*/
private long sessionID;
/**
* 消息类型
*/
private byte type;
/**
* 消息优先级
*/
private byte priority;
/**
* 附件
*/
private Map<String, Object> attachment = new HashMap<>();
}
Header中的attachment可以用于扩展,如果某些消息需要添加特定的头信息,可以通过attachment添加。
编码解码
编码
public class NettyMessageEncoder extends MessageToMessageEncoder<NettyMessage> {
private final MarshallingEncoder marshallingEncoder = new MarshallingEncoder();
protected void encode(ChannelHandlerContext channelHandlerContext, NettyMessage msg, List<Object> list) throws Exception {
if (msg == null || msg.getHeader() == null) {
throw new Exception("The encode message is null");
}
ByteBuf sendBuf = Unpooled.buffer();
sendBuf.writeInt((msg.getHeader().getCrcCode()));
final int lengthIndex = sendBuf.writerIndex();
sendBuf.writeInt((msg.getHeader().getLength()));
sendBuf.writeLong((msg.getHeader().getSessionID()));
sendBuf.writeByte((msg.getHeader().getType()));
sendBuf.writeByte((msg.getHeader().getPriority()));
sendBuf.writeInt((msg.getHeader().getAttachment().size()));
for (String key : msg.getHeader().getAttachment().keySet()) {
byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
sendBuf.writeInt(keyBytes.length);
sendBuf.writeBytes(keyBytes);
Object value = msg.getHeader().getAttachment().get(key);
marshallingEncoder.encode(value, sendBuf);
}
if (msg.getBody() != null) {
marshallingEncoder.encode(msg.getBody(), sendBuf);
} else {
sendBuf.writeInt(0);
}
sendBuf.setInt(lengthIndex, sendBuf.readableBytes());
// 将编码后的 ByteBuf 添加到输出列表
list.add(sendBuf);
}
}
需要注意的是,将NettyMessage编码为二进制流时,第二个四字节的长度字段应该是真实的编码后的二进制消息的总长度,因此这里最后通过sendBuf.setInt用真实二进制长度重写该值。
对于头信息中的attachment和消息体,都使用 JBoss 的 marshalling 进行序列化,具体实现可以查看。
解码
NettyMessage的解码器借助LengthFieldBasedFrameDecoder实现:
public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
private final MarshallingDecoder marshallingDecoder = new MarshallingDecoder();
public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, -8, 0);
}
// ...
}
这里的-8是最终帧长度计算时需要-8,否则总的帧长度会比真实长度多8。
LengthFieldBasedFrameDecoder已经实现了通过帧长度获取完整帧,因此我们不需要考虑黏包半包问题,直接从 ByteBuf 中读取头信息和消息体即可,具体的读取过程不做赘述,可以查看。
握手认证
握手认证的消息类型定义:
public static class Type{
public static final byte LOGIN_AUTH_REQ = 0;
public static final byte LOGIN_AUTH_RESP = 1;
}
客户端的LoginAuthRespHandler用于连接时主动发起握手认证请求:
public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buildLoginAuthReq());
}
private static NettyMessage buildLoginAuthReq() {
NettyMessage message = new NettyMessage();
Header header = new Header();
header.setType(Header.Type.LOGIN_AUTH_REQ);
message.setHeader(header);
return message;
}
// ...
}
服务端的LoginAuthReqHandler用于接收握手认证请求,验证后返回认证结果:
public class LoginAuthReqHandler extends SimpleChannelInboundHandler<NettyMessage> {
private static String[] whiteList = {"127.0.0.1", "192.168.1.100"};
private static Set<String> loginSet = ConcurrentHashMap.newKeySet();
protected void channelRead0(ChannelHandlerContext ctx, NettyMessage nettyMessage) throws Exception {
if (nettyMessage == null) {
return;
}
if (nettyMessage.getHeader() == null) {
return;
}
if (nettyMessage.getHeader().getType() != Header.Type.LOGIN_AUTH_REQ) {
ctx.fireChannelRead(nettyMessage);
return;
}
// 获取客户端 ip (带端口号)
String ipAndPort = ctx.channel().remoteAddress().toString();
log.debug("客户端ip和端口:{}", ipAndPort);
// 获取只有 host 的 ip
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String ip = socketAddress.getAddress().getHostAddress();
log.debug("客户端ip:{}", ip);
if (!inWhiteList(ip)) {
log.error("客户端{},不在白名单中", ip);
ctx.writeAndFlush(buildLoginAuthResp((byte) -1));
return;
}
if (loginSet.contains(ipAndPort)) {
log.error("客户端{},已经登录,不能重复登录", ipAndPort);
ctx.writeAndFlush(buildLoginAuthResp((byte) -1));
return;
}
ctx.writeAndFlush(buildLoginAuthResp((byte) 0));
loginSet.add(ipAndPort);
log.info("客户端{},握手成功", ipAndPort);
}
private static boolean inWhiteList(String ip) {
for (String s : whiteList) {
if (s.equals(ip)) {
return true;
}
}
return false;
}
private static NettyMessage buildLoginAuthResp(byte result) {
NettyMessage message = new NettyMessage();
Header header = new Header();
header.setType(Header.Type.LOGIN_AUTH_RESP);
message.setHeader(header);
message.setBody(result);
return message;
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 与客户端断开连接后,移除客户端的已登录信息
loginSet.remove(ctx.channel().remoteAddress().toString());
ctx.close();
ctx.fireExceptionCaught(cause);
}
}
需要注意的是,这里需要重写exceptionCaught,以在连接意外断开后移除客户端的连接记录,服务端会用这个连接记录检查客户端是否重复登录。如果不这么做,就会导致客户端断开连接后再次尝试重连时被服务端拒绝(服务端依然有客户端的连接记录)。
客户端的LoginAuthRespHandler会处理服务端返回的认证结果:
public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter {
// ...
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage message = (NettyMessage) msg;
if (message.getHeader() == null) {
return;
}
if (message.getHeader().getType() == Header.Type.LOGIN_AUTH_RESP) {
byte result = (byte) message.getBody();
if (result != (byte) 0) {
log.info("握手验证失败");
ctx.close();
} else {
log.info("握手验证成功");
ctx.fireChannelRead(msg);
}
} else {
ctx.fireChannelRead(msg);
}
}
// ...
}
没有复杂的逻辑处理,仅判断认证结果后打印,并在认证失败时关闭连接。
心跳
协议设置为一定时间没有接收到数据时断开连接,以避免资源浪费,这可以通过在客户端添加 Netty 的处理器ReadTimeoutHandler实现:
ch.pipeline().addLast(new NettyMessageEncoder());
ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
ch.pipeline().addLast(new ReadTimeoutHandler(50, TimeUnit.SECONDS));
// ...
这里设置了如果 50 秒没有接收到数据就断开连接。
因此需要实现心跳保活,以防止存在通信空闲时导致连接中断的问题。
心跳的消息类型定义:
public static class Type{
// ...
public static final byte HEARTBEAT_REQ = 2;
public static final byte HEARTBEAT_RESP = 3;
}
心跳请求由客户端发起,客户端在握手认证成功后创建一个固定时间间隔的定时任务,以发送心跳请求:
public class HeartBeatRespHandler extends SimpleChannelInboundHandler<NettyMessage> {
private ScheduledFuture<?> heatBeatScheduleFuture;
protected void channelRead0(ChannelHandlerContext ctx, NettyMessage nettyMessage) throws Exception {
if (nettyMessage.getHeader() == null) {
return;
}
// 如果是认证响应消息,说明已经通过认证,客户端主动发起心跳请求
if (nettyMessage.getHeader().getType() == Header.Type.LOGIN_AUTH_RESP) {
// 使用线程池,每隔5秒发送一次心跳请求
log.info("开启心跳请求发送定时任务");
heatBeatScheduleFuture = ctx.executor().scheduleAtFixedRate(() -> {
ctx.writeAndFlush(buildHeartBeatReq());
}, 0, 5000, TimeUnit.MILLISECONDS);
} else if (nettyMessage.getHeader().getType() == Header.Type.HEARTBEAT_RESP) {
// 如果是心跳响应消息,打印日志
log.info("收到心跳响应消息{}", nettyMessage);
} else {
// 其他消息,继续传递给下一个handler处理
ctx.fireChannelRead(nettyMessage);
}
}
private static NettyMessage buildHeartBeatReq() {
NettyMessage message = new NettyMessage();
Header header = new Header();
header.setType(Header.Type.HEARTBEAT_REQ);
message.setHeader(header);
return message;
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (heatBeatScheduleFuture != null){
heatBeatScheduleFuture.cancel(true);
heatBeatScheduleFuture = null;
}
ctx.fireExceptionCaught(cause);
}
}
服务端收到心跳请求后,返回心跳响应:
public class HeartBeatReqHandler extends SimpleChannelInboundHandler<NettyMessage> {
protected void channelRead0(ChannelHandlerContext ctx, NettyMessage nettyMessage) throws Exception {
if (nettyMessage.getHeader() == null) {
return;
}
// 如果是心跳请求消息,返回心跳响应消息
if (nettyMessage.getHeader().getType() == Header.Type.HEARTBEAT_REQ) {
log.trace("收到心跳请求消息{}", nettyMessage);
NettyMessage respMsg = buildHeatBeatResp();
ctx.writeAndFlush(respMsg);
log.trace("发送心跳响应消息{}", respMsg);
} else {
// 其他消息,继续转发
ctx.fireChannelRead(nettyMessage);
}
}
private static NettyMessage buildHeatBeatResp() {
NettyMessage respMsg = new NettyMessage();
Header header = new Header();
header.setType(Header.Type.HEARTBEAT_RESP);
respMsg.setHeader(header);
return respMsg;
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
服务端
public class NettyServer {
private void start() throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel channel) throws Exception {
channel.pipeline().addLast(new NettyMessageEncoder());
channel.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
channel.pipeline().addLast(new LoginAuthReqHandler());
channel.pipeline().addLast(new HeartBeatReqHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(ServerConfig.HOST, ServerConfig.PORT).sync();
log.info("启动 NettyServer 成功");
channelFuture.channel().closeFuture().sync();
log.info("NettyServer 停止");
}
finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyServer().start();
}
}
客户端
public class NettyClient {
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
public void connect(String host, int port) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyMessageEncoder());
ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
ch.pipeline().addLast(new ReadTimeoutHandler(50, TimeUnit.SECONDS));
ch.pipeline().addLast(new LoginAuthRespHandler());
ch.pipeline().addLast(new HeartBeatRespHandler());
}
});
try {
// 不指定本地地址,让操作系统自动分配端口,避免端口占用问题
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("启动 NettyClient 失败", e);
throw e;
} finally {
group.shutdownGracefully();
// 客户端断开连接后 5 秒后重连
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
log.info("尝试重新连接");
connect(host, port);
} catch (Exception e) {
log.error("重连失败", e);
}
});
}
}
public static void main(String[] args) throws InterruptedException {
new NettyClient().connect(ServerConfig.HOST, ServerConfig.PORT);
}
}
客户端在断开连接后,会尝试在 5 秒后重新连接。
The End.
本文的完整示例代码可以从获取。
参考资料

文章评论