添加依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.132.Final</version>
</dependency>
服务端:
// 构建服务端的辅助类
new ServerBootstrap()
// 添加事件循环组
.group(new NioEventLoopGroup())
// 指定服务端通道的类型
.channel(NioServerSocketChannel.class)
// 添加通道处理器
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// 为通道添加处理器,仅会在连接创建后调用一次
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
Channel channel = ctx.channel();
System.out.printf("接收到来自[%s]的消息:%s%n", channel, s);
}
});
}
})
.bind(8080);
每个事件循环(EventLoop)相当于一个处理通道事件的单个线程,而事件循环组(EventLoopGroup)相当于包含若干个事件循环的线程池。
服务端与客户端的连接创建后,具体的数据处理取决于通道的处理器(Handler),多个处理器形成一个处理器链,数据依次通过入处理器(InHandler)进行处理,并在输出数据时依次通过出处理器(OutHandler)进行输出。
在这个简单示例中,仅添加了一个用于将客户端传入的 Byte[] 转换为String的StringDecoder处理器,并使用一个自定义处理器将内容打印到控制台。
客户端:
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080)
.sync()
.channel()
.writeAndFlush("hello world");
客户端与服务端类似,不过不需要考虑事件循环,此外处理器也仅需要一个将String转换为byte[]的StringEncoder。
EventLoop
使用next可以遍历事件循环组内的事件循环:
DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(2);
EventLoop next = eventExecutors.next();
System.out.println(next);
next = eventExecutors.next();
System.out.println(next);
next = eventExecutors.next();
System.out.println(next);
io.netty.channel.DefaultEventLoop@5fa07e12 io.netty.channel.DefaultEventLoop@55b53d44 io.netty.channel.DefaultEventLoop@5fa07e12
也可以使用迭代器遍历:
DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(2);
for (EventExecutor eventExecutor : eventExecutors) {
System.out.println(eventExecutor);
}
io.netty.channel.DefaultEventLoop@5fa07e12 io.netty.channel.DefaultEventLoop@55b53d44
当一个客户端-服务端连接创建,事件循环组中就会指派一个事件循环绑定到该连接(通道),并处理产生的事件。
服务端:
new ServerBootstrap()
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
if (byteBuf != null) {
byte[] buf = new byte[16];
ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
log.debug(new String(buf));
}
}
});
}
})
.bind(8080)
.sync();
这个示例中用于处理通道事件的事件循环组中有2个事件循环(new NioEventLoopGroup(2))。
客户端:
public class Client {
public static void main(String[] args) throws InterruptedException {
clientSend("zhangsan");
clientSend("lisi");
clientSend("wangwu");
}
private static void clientSend(String content) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.println("init...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.channel(NioSocketChannel.class).connect("localhost", 8080)
.sync()
.channel();
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes(content.getBytes()));
Thread.sleep(2000);
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes(content.getBytes()));
}
}
这里依次创建3个客户端,并连接服务端发送信息,最终服务端输出:
2026-04-27 14:08:48 [nioEventLoopGroup-3-1] DEBUG cn.icexmoon.netty.eventloop.Server - zhangsan 2026-04-27 14:08:50 [nioEventLoopGroup-3-1] DEBUG cn.icexmoon.netty.eventloop.Server - zhangsan 2026-04-27 14:08:50 [nioEventLoopGroup-3-2] DEBUG cn.icexmoon.netty.eventloop.Server - lisi 2026-04-27 14:08:52 [nioEventLoopGroup-3-2] DEBUG cn.icexmoon.netty.eventloop.Server - lisi 2026-04-27 14:08:52 [nioEventLoopGroup-3-1] DEBUG cn.icexmoon.netty.eventloop.Server - wangwu 2026-04-27 14:08:54 [nioEventLoopGroup-3-1] DEBUG cn.icexmoon.netty.eventloop.Server - wangwu
可以看到服务端两个事件循环线程轮流处理通道的 Read 事件。
执行普通任务
EventLoopGroup 可以用于执行普通任务,就像一个普通的线程池那样:
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(2);
CountDownLatch latch = new CountDownLatch(1);
eventExecutors.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("Normal task...");
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
执行定时任务
CountDownLatch countDownLatch = new CountDownLatch(10);
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(2);
eventExecutors.scheduleAtFixedRate(() -> {
log.debug("Schedule task...");
countDownLatch.countDown();
}, 0,1, java.util.concurrent.TimeUnit.SECONDS);
countDownLatch.await();
Channel
客户端的connect方法实际返回的是一个ChannelFuture对象:
ChannelFuture connect = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080);
connect.sync()
.channel()
.writeAndFlush("hello world");
就像所有被命名为Future的类型,这实际是一个异步调用,因此connect方法并不会等待连接创建,而是立即返回,因此这里需要使用sync方法等待连接创建后再获取通道(Channel)并写入信息。
可以用以下代码证明:
log.debug(String.valueOf(connect.channel()));
connect.sync();
log.debug(String.valueOf(connect.channel()));
输出:
2026-04-27 14:39:36 [main] DEBUG cn.icexmoon.netty.hello.Client - [id: 0x3ad367ca] 2026-04-27 14:39:36 [main] DEBUG cn.icexmoon.netty.hello.Client - [id: 0x3ad367ca, L:/127.0.0.1:14857 - R:/127.0.0.1:8080]
处理连接关闭
类似的,关闭连接同样会产生一个Future对象:
ChannelFuture connect = new Bootstrap()
// ...
Channel connectedChannel = connect.sync()
.channel();
connectedChannel.writeAndFlush("hello world");
new Thread(() -> {
// 处理用户输入
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
connectedChannel.close();
break;
}
else{
connectedChannel.writeAndFlush(line);
}
}
}).start();
connectedChannel.closeFuture().sync();
log.info("连接已关闭" );
在这个示例中,用一个额外线程处理用户输入,并在用户输入特定字符(q)后主动关闭连接。要在连接关闭后执行特定代码,可以像示例中那样,调用CloseFuture对象的sync方法以阻塞主线程,以等待连接关闭。
或者也可以:
connectedChannel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.info("连接已关闭");
}
});
Future & Promise
JDK 中多线程提交任务时,会返回一个Future类型的结果:
try (ExecutorService executorService = Executors.newCachedThreadPool()) {
Future<Integer> submit = executorService.submit(() -> new Random().nextInt(100));
System.out.println(submit.get());
}
使用Future.get方法可以阻塞式地等待异步线程返回结果。
Netty 重写了 JDK 的 Future 接口,提供两个新的接口:Promise和Future。
Netty 重写的这些类型,不仅可以以阻塞的方式获取结果,还可以以非阻塞的方式获取结果:
try (DefaultEventLoop eventLoop = new DefaultEventLoop()) {
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
eventLoop.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int i = new Random().nextInt(100);
promise.setSuccess(i);
});
System.out.println(promise.getNow());
System.out.println(promise.get());
}
输出:
null 14
这里的promise.getNow()是非阻塞的方式获取结果,因此获取到的是null,而promise.get()与 JDK 接口相同,是阻塞的方式获取结果。
可以使用Promise.addListener的方式添加回调处理方法,以异步的方式处理返回结果:
try (DefaultEventLoop eventLoop = new DefaultEventLoop()) {
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
promise.addListener(future -> {
System.out.printf("get result: %s%n", future.getNow());
});
eventLoop.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int i = new Random().nextInt(100);
promise.setSuccess(i);
});
}
以同步的方式处理错误:
try(DefaultEventLoop eventLoop = new DefaultEventLoop()){
Promise<Integer> promise = new DefaultPromise<>(eventLoop);
eventLoop.execute(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
promise.setFailure(new RuntimeException("模拟错误产生"));
});
System.out.println(promise.getNow());
Assertions.assertThrows(ExecutionException.class, promise::get);
}
这里主线程通过Promise.get获取结果时捕获到异常,并且该异常被封装为ExecutionException类型。
如果使用的是promise.await,与promise.sync不同,它不会产生异常,需要使用isSuccess判断调用成功还是失败:
try(DefaultEventLoop eventLoop = new DefaultEventLoop()){
Promise<Integer> promise = new DefaultPromise<>(eventLoop);
eventLoop.execute(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
promise.setFailure(new RuntimeException("模拟错误产生"));
});
promise.await();
if (promise.isSuccess()){
System.out.println(promise.getNow());
}
else{
System.out.println(promise.cause());
}
}
以异步的方式处理错误:
try(DefaultEventLoop eventLoop = new DefaultEventLoop()){
Promise<Integer> promise = new DefaultPromise<>(eventLoop);
promise.addListener(future -> {
if (future.isSuccess()){
System.out.println(future.getNow());
}
else{
System.out.println(future.cause());
}
});
eventLoop.execute(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
promise.setFailure(new RuntimeException("模拟错误产生"));
});
}
Handler & Pipeline
处理器(Handler)分为入站和出站两种,多个处理器构成管道(Pipeline)的处理器链,它们的执行顺序取决于在管道上的添加顺序:
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("1");
ctx.fireChannelRead(msg);
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("2");
ctx.fireChannelRead(msg);
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("3");
ctx.fireChannelRead(msg);
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().writeAndFlush("bye");
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("4");
ctx.write(msg, promise);
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("5");
ctx.write(msg, promise);
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("6");
ctx.write(msg, promise);
}
});
}
})
.bind(8080);
客户端:
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("localhost", 8080)
.sync()
.channel().writeAndFlush("hello world");
输出:
1 2 3 6 5 4

文章评论