红茶的个人站点

  • 首页
  • 专栏
  • 开发工具
  • 其它
  • 隐私政策
Awalon
Talk is cheap,show me the code.
  1. 首页
  2. 开发工具
  3. 正文

Netty 学习笔记 2:Hello World

2026年4月30日 5点热度 0人点赞 0条评论

Hello World

添加依赖:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.132.Final</version>
</dependency>

服务端:

 // 构建服务端的辅助类
new ServerBootstrap()
    // 添加事件循环组
    .group(new NioEventLoopGroup())
    // 指定服务端通道的类型
    .channel(NioServerSocketChannel.class)
    // 添加通道处理器
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            // 为通道添加处理器,仅会在连接创建后调用一次
            ch.pipeline().addLast(new StringDecoder());
            ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                @Override
                protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
                    Channel channel = ctx.channel();
                    System.out.printf("接收到来自[%s]的消息:%s%n", channel, s);
                }
            });
        }
    })
    .bind(8080);

每个事件循环(EventLoop)相当于一个处理通道事件的单个线程,而事件循环组(EventLoopGroup)相当于包含若干个事件循环的线程池。

服务端与客户端的连接创建后,具体的数据处理取决于通道的处理器(Handler),多个处理器形成一个处理器链,数据依次通过入处理器(InHandler)进行处理,并在输出数据时依次通过出处理器(OutHandler)进行输出。

在这个简单示例中,仅添加了一个用于将客户端传入的 Byte[] 转换为String的StringDecoder处理器,并使用一个自定义处理器将内容打印到控制台。

客户端:

new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080)
    .sync()
    .channel()
    .writeAndFlush("hello world");

客户端与服务端类似,不过不需要考虑事件循环,此外处理器也仅需要一个将String转换为byte[]的StringEncoder。

EventLoop

使用next可以遍历事件循环组内的事件循环:

DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(2);
EventLoop next = eventExecutors.next();
System.out.println(next);
next = eventExecutors.next();
System.out.println(next);
next = eventExecutors.next();
System.out.println(next);
io.netty.channel.DefaultEventLoop@5fa07e12
io.netty.channel.DefaultEventLoop@55b53d44
io.netty.channel.DefaultEventLoop@5fa07e12

也可以使用迭代器遍历:

DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(2);
for (EventExecutor eventExecutor : eventExecutors) {
    System.out.println(eventExecutor);
}
io.netty.channel.DefaultEventLoop@5fa07e12
io.netty.channel.DefaultEventLoop@55b53d44

当一个客户端-服务端连接创建,事件循环组中就会指派一个事件循环绑定到该连接(通道),并处理产生的事件。

服务端:

new ServerBootstrap()
    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;
                    if (byteBuf != null) {
                        byte[] buf = new byte[16];
                        ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes());
                        log.debug(new String(buf));
                    }
                }
            });
        }
    })
    .bind(8080)
    .sync();

这个示例中用于处理通道事件的事件循环组中有2个事件循环(new NioEventLoopGroup(2))。

客户端:

public class Client {
    public static void main(String[] args) throws InterruptedException {
        clientSend("zhangsan");
        clientSend("lisi");
        clientSend("wangwu");
    }

    private static void clientSend(String content) throws InterruptedException {
        Channel channel = new Bootstrap()
            .group(new NioEventLoopGroup(1))
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    System.out.println("init...");
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                }
            })
            .channel(NioSocketChannel.class).connect("localhost", 8080)
            .sync()
            .channel();
        channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes(content.getBytes()));
        Thread.sleep(2000);
        channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes(content.getBytes()));
    }
}

这里依次创建3个客户端,并连接服务端发送信息,最终服务端输出:

2026-04-27 14:08:48 [nioEventLoopGroup-3-1] DEBUG cn.icexmoon.netty.eventloop.Server - zhangsan        
2026-04-27 14:08:50 [nioEventLoopGroup-3-1] DEBUG cn.icexmoon.netty.eventloop.Server - zhangsan        
2026-04-27 14:08:50 [nioEventLoopGroup-3-2] DEBUG cn.icexmoon.netty.eventloop.Server - lisi            
2026-04-27 14:08:52 [nioEventLoopGroup-3-2] DEBUG cn.icexmoon.netty.eventloop.Server - lisi            
2026-04-27 14:08:52 [nioEventLoopGroup-3-1] DEBUG cn.icexmoon.netty.eventloop.Server - wangwu          
2026-04-27 14:08:54 [nioEventLoopGroup-3-1] DEBUG cn.icexmoon.netty.eventloop.Server - wangwu

可以看到服务端两个事件循环线程轮流处理通道的 Read 事件。

执行普通任务

EventLoopGroup 可以用于执行普通任务,就像一个普通的线程池那样:

NioEventLoopGroup eventExecutors = new NioEventLoopGroup(2);
CountDownLatch latch = new CountDownLatch(1);
eventExecutors.execute(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    log.debug("Normal task...");
    latch.countDown();
});
try {
    latch.await();
} catch (InterruptedException e) {
    throw new RuntimeException(e);
}

执行定时任务

CountDownLatch countDownLatch = new CountDownLatch(10);
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(2);
eventExecutors.scheduleAtFixedRate(() -> {
    log.debug("Schedule task...");
    countDownLatch.countDown();
}, 0,1, java.util.concurrent.TimeUnit.SECONDS);
countDownLatch.await();

Channel

客户端的connect方法实际返回的是一个ChannelFuture对象:

ChannelFuture connect = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080);
connect.sync()
    .channel()
    .writeAndFlush("hello world");

就像所有被命名为Future的类型,这实际是一个异步调用,因此connect方法并不会等待连接创建,而是立即返回,因此这里需要使用sync方法等待连接创建后再获取通道(Channel)并写入信息。

可以用以下代码证明:

log.debug(String.valueOf(connect.channel()));
connect.sync();
log.debug(String.valueOf(connect.channel()));

输出:

2026-04-27 14:39:36 [main] DEBUG cn.icexmoon.netty.hello.Client - [id: 0x3ad367ca]
2026-04-27 14:39:36 [main] DEBUG cn.icexmoon.netty.hello.Client - [id: 0x3ad367ca, L:/127.0.0.1:14857 - R:/127.0.0.1:8080]

处理连接关闭

类似的,关闭连接同样会产生一个Future对象:

ChannelFuture connect = new Bootstrap()
    // ...
Channel connectedChannel = connect.sync()
    .channel();
connectedChannel.writeAndFlush("hello world");
new Thread(() -> {
    // 处理用户输入
    Scanner scanner = new Scanner(System.in);
    while (true) {
        String line = scanner.nextLine();
        if ("q".equals(line)) {
            connectedChannel.close();
            break;
        }
        else{
            connectedChannel.writeAndFlush(line);
        }
    }
}).start();
connectedChannel.closeFuture().sync();
log.info("连接已关闭" );

在这个示例中,用一个额外线程处理用户输入,并在用户输入特定字符(q)后主动关闭连接。要在连接关闭后执行特定代码,可以像示例中那样,调用CloseFuture对象的sync方法以阻塞主线程,以等待连接关闭。

或者也可以:

connectedChannel.closeFuture().addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        log.info("连接已关闭");
    }
});

Future & Promise

JDK 中多线程提交任务时,会返回一个Future类型的结果:

try (ExecutorService executorService = Executors.newCachedThreadPool()) {
    Future<Integer> submit = executorService.submit(() -> new Random().nextInt(100));
    System.out.println(submit.get());
}

使用Future.get方法可以阻塞式地等待异步线程返回结果。

Netty 重写了 JDK 的 Future 接口,提供两个新的接口:Promise和Future。

Netty 重写的这些类型,不仅可以以阻塞的方式获取结果,还可以以非阻塞的方式获取结果:

try (DefaultEventLoop eventLoop = new DefaultEventLoop()) {
    DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
    eventLoop.execute(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        int i = new Random().nextInt(100);
        promise.setSuccess(i);
    });
    System.out.println(promise.getNow());
    System.out.println(promise.get());
}

输出:

null
14

这里的promise.getNow()是非阻塞的方式获取结果,因此获取到的是null,而promise.get()与 JDK 接口相同,是阻塞的方式获取结果。

可以使用Promise.addListener的方式添加回调处理方法,以异步的方式处理返回结果:

try (DefaultEventLoop eventLoop = new DefaultEventLoop()) {
    DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
    promise.addListener(future -> {
        System.out.printf("get result: %s%n", future.getNow());
    });
    eventLoop.execute(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        int i = new Random().nextInt(100);
        promise.setSuccess(i);
    });
}

以同步的方式处理错误:

try(DefaultEventLoop eventLoop = new DefaultEventLoop()){
    Promise<Integer> promise = new DefaultPromise<>(eventLoop);
    eventLoop.execute(()->{
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        promise.setFailure(new RuntimeException("模拟错误产生"));
    });
    System.out.println(promise.getNow());
    Assertions.assertThrows(ExecutionException.class, promise::get);
}

这里主线程通过Promise.get获取结果时捕获到异常,并且该异常被封装为ExecutionException类型。

如果使用的是promise.await,与promise.sync不同,它不会产生异常,需要使用isSuccess判断调用成功还是失败:

try(DefaultEventLoop eventLoop = new DefaultEventLoop()){
    Promise<Integer> promise = new DefaultPromise<>(eventLoop);
    eventLoop.execute(()->{
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        promise.setFailure(new RuntimeException("模拟错误产生"));
    });
    promise.await();
    if (promise.isSuccess()){
        System.out.println(promise.getNow());
    }
    else{
        System.out.println(promise.cause());
    }
}

以异步的方式处理错误:

try(DefaultEventLoop eventLoop = new DefaultEventLoop()){
    Promise<Integer> promise = new DefaultPromise<>(eventLoop);
    promise.addListener(future -> {
        if (future.isSuccess()){
            System.out.println(future.getNow());
        }
        else{
            System.out.println(future.cause());
        }
    });
    eventLoop.execute(()->{
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        promise.setFailure(new RuntimeException("模拟错误产生"));
    });
}

Handler & Pipeline

处理器(Handler)分为入站和出站两种,多个处理器构成管道(Pipeline)的处理器链,它们的执行顺序取决于在管道上的添加顺序:

new ServerBootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new StringDecoder());
            ch.pipeline().addLast(new StringEncoder());
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    System.out.println("1");
                    ctx.fireChannelRead(msg);
                }
            });
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    System.out.println("2");
                    ctx.fireChannelRead(msg);
                }
            });
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    System.out.println("3");
                    ctx.fireChannelRead(msg);
                }
            });
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ctx.channel().writeAndFlush("bye");
                }
            });
            ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    System.out.println("4");
                    ctx.write(msg, promise);
                }
            });
            ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    System.out.println("5");
                    ctx.write(msg, promise);
                }
            });
            ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    System.out.println("6");
                    ctx.write(msg, promise);
                }
            });
        }
    })
    .bind(8080);

客户端:

new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("localhost", 8080)
    .sync()
    .channel().writeAndFlush("hello world");

输出:

1
2
3
6
5
4
本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: 暂无
最后更新:2026年4月30日

魔芋红茶

加一点PHP,加一点Go,加一点Python......

点赞
< 上一篇

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复

COPYRIGHT © 2021 icexmoon.cn. ALL RIGHTS RESERVED.
本网站由提供CDN加速/云存储服务

Theme Kratos Made By Seaton Jiang

宁ICP备2021001508号

宁公网安备64040202000141号