Netty入门案例

  1. 服务端
  2. 客户端
  3. 分析
    1. 服务端中的BossGroup和WorkerGroup一共有多少个NioEventLoop?
    2. Channel和Pipeline的关系?

Netty Reactor架构图

服务端

Server:

public class Server {
    public static void main(String[] args) throws InterruptedException {
        // 创建两个线程组 bossGroup 和 workerGroup
        // bossGroup 只处理连接请求,真正的客户端业务请求会交由 workerGroup 处理
        // 都是无限循环
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();

            //设置
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG, 128)  //线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    }); //  给 workerGroup 的 EventLoop 对应的管道设置处理器
            System.out.println("服务器已就绪...");

            // 绑定一个端口并且同步,生成了一个channelFuture对象
            // 启动服务器
            ChannelFuture channelFuture = bootstrap.bind(6688).sync();
            // 对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

ServerHandler:

/**
 * 我们自定义一个 Handler 需要继承 netty 规定好的某个HandlerAdapter (规范)
 * 由是则可谓之为 Handler 也
 */
public class ServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 读取实际数据 (读取客户端发来的消息)
     * @param ctx 上下文对象: 含有管道pipeline, 通道channel, 地址
     * @param msg 客户端发送的数据, 默认是Obj
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server context = " + ctx);
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送的消息是: " + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址: " + ctx.channel().remoteAddress());
    }

    /**
     * 数据读取完毕
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //将数据写入缓存,并刷新
        //一般来讲,我们需要对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好啊, 客户端!", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户端

Client:

public class Client {
    public static void main(String[] args) {
        //客户端创建一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();

        try{
            //创建客户端启动对象
            //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();

            //设置相关参数
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ClientHandler());
                        }
                    });
            System.out.println("客户端已就绪...");

            //启动客户端去连接服务器端
            //关于 ChannelFuture 要分析,涉及到netty异步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6688).sync();
            //给关闭通道进行监听
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }

    }
}

ClientHandler:

public class ClientHandler extends ChannelInboundHandlerAdapter {
    /**
     * 当通道就绪就会触发该方法
     * @param ctx context
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, server: (>^ω^<)", CharsetUtil.UTF_8));
    }

    /**
     * 当通道有读取事件时,会触发
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回复消息: " + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器地址: " + ctx.channel().remoteAddress());
    }

    /**
     * 异常捕获
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

分析

服务端中的BossGroup和WorkerGroup一共有多少个NioEventLoop?

即问它们各自所含有的子线程的个数

我们不断查看NioEventLoopGroup的默认构造方法,发现其最终调用了如下构造参数:

可知: 默认构造线程池含有的线程数为 2 * 本机CPU核数

也可以在构造时传入参数,自定义线程池中线程的数量:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

Channel和Pipeline的关系?

由Context: ctx 可以获得对应的 channel, pipeline。它们的关系是互相包含,互为成员变量。

字节快HR面了,可能转go,待续。。


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达,邮件至 708801794@qq.com

文章标题:Netty入门案例

文章字数:876

本文作者:梅罢葛

发布时间:2020-10-27, 20:28:27

最后更新:2020-11-04, 12:55:12

原始链接:https://qiurungeng.github.io/2020/10/27/Netty%E5%85%A5%E9%97%A8%E6%A1%88%E4%BE%8B/
目录
×

喜欢就点赞,疼爱就打赏