欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty入门demo

程序员文章站 2022-04-24 10:50:23
...

Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
Netty 作为异步事件驱动的网络,高性能之处主要来自于其 I/O 模型和线程处理模型,前者决定如何收发数据,后者决定如何处理数据。
1、I/O 模型
NIO
Netty 的 IO 线程 NioEventLoop 由于聚合了多路复用器 Selector,可以同时并发处理成百上千个客户端连接。当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

基于 Buffer
传统的 I/O 是面向字节流或字符流的,以流式的方式顺序地从一个 Stream 中读取一个或多个字节, 因此也就不能随意改变读取指针的位置。在 NIO 中,抛弃了传统的 I/O 流,而是引入了 Channel 和 Buffer 的概念。在 NIO 中,只能从 Channel 中读取数据到 Buffer 中或将数据从 Buffer 中写入到 Channel。基于 Buffer 操作不像传统 IO 的顺序操作,NIO 中可以随意地读取任意位置的数据。

2、线程处理模型
通常,我们设计一个事件处理模型的程序有两种思路:
轮询方式:线程不断轮询访问相关事件发生源有没有发生事件,有发生事件就调用事件处理逻辑。
事件驱动方式:发生事件,主线程把事件放入事件队列,在另外线程不断循环消费事件列表中的事件,调用事件对应的处理逻辑处理事件。事件驱动方式也被称为消息通知方式,其实是设计模式中观察者模式的思路。

以 GUI 的逻辑处理为例,说明两种逻辑的不同:
轮询方式,线程不断轮询是否发生按钮点击事件,如果发生,调用处理逻辑。
事件驱动方式,发生点击事件把事件放入事件队列,在另外线程消费的事件列表中的事件,根据事件类型调用相关事件处理逻辑。

主要包括 4 个基本组件:

  • 事件队列(event queue):接收事件的入口,存储待处理事件。
  • 分发器(event mediator):将不同的事件分发到不同的业务逻辑单元。
  • 事件通道(event channel):分发器与处理器之间的联系渠道。
  • 事件处理器(event processor):实现业务逻辑,处理完成后会发出事件,触发下一步操作。

可以看出,相对传统轮询模式,事件驱动有如下优点:

  • 可扩展性好,分布式的异步架构,事件处理器之间高度解耦,可以方便扩展事件处理逻辑。
  • 高性能,基于队列暂存事件,能方便并行异步处理事件。

模块组件
Bootstrap、ServerBootstrap
Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。

Future、ChannelFuture
正如前面介绍,在 Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。

Channel
Netty 网络通信的组件,能够用于执行网络 I/O 操作。Channel 为用户提供:

  • 当前网络连接的通道的状态(例如是否打开?是否已连接?)
  • 网络连接的配置参数 (例如接收缓冲区大小)
  • 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的I/O 操作已完成。
  • 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O
    操作成功、失败或取消时回调通知调用方。
  • 支持关联 I/O 操作与对应的处理程序。

不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应。下面是一些常用的 Channel 类型:
NioSocketChannel,异步的客户端 TCP Socket 连接。
NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
NioDatagramChannel,异步的 UDP 连接。
NioSctpChannel,异步的客户端 Sctp 连接。
NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

Selector
Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。

NioEventLoop
NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:
I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。
非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。
两种任务的执行时间比由变量 ioRatio 控制,默认为 50,则表示允许非 IO 任务执行的时间与 IO 任务的执行时间相等。

NioEventLoopGroup
NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程。

ChannelHandler
ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:
ChannelInboundHandler 用于处理入站 I/O 事件。
ChannelOutboundHandler 用于处理出站 I/O 操作。
或者使用以下适配器类:
ChannelInboundHandlerAdapter 用于处理入站 I/O 事件。
ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作。
ChannelDuplexHandler 用于处理入站和出站事件。
ChannelHandlerContext
保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。

Netty的Server:

public class NettyServer {

    public void run() throws Exception{
        // ServerBootstrap负责建立服务端,可以直接使用Channel去建立服务端但是一般使用ServerBootstrap
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // Event:事件,Loop:循环,Group:群组
        // Netty内部都是通过线程在处理各种数据,EventLoopGroup就是用来管理调度他们的,注册Channel,管理他们的生命周期。
        // NioEventLoopGroup是一个处理I/O操作的多线程事件循环
        // boos接收传入连接,因为boos仅接收客户端连接,不做复杂的逻辑处理,为了尽可能减少资源的占用,取值越小越好
        EventLoopGroup boos = new NioEventLoopGroup();
        // 处理boss接收的连接的流量和将接收的连接注册进入这个worker
        EventLoopGroup worker = new NioEventLoopGroup();

        serverBootstrap.group(boos, worker)
            // 指定使用NioServerSocketChannel产生一个Channel用来接收连接
            .channel(NioServerSocketChannel.class)
            // ChannelInitializer用于配置一个新的Channel
            // 用于向你的Channel当中添加ChannelInboundHandler的实现
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                protected void initChannel(NioSocketChannel ch) {
                    // ChannelPipeline用于存放管理ChannelHandel
                    //  ChannelHandler用于处理请求响应的业务逻辑相关代码
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                        System.out.println(msg);
                        }
                    });
                }
            })
            // 以下是socket的标准参数
            // BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握
            // 手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
            // Option是为了NioServerSocketChannel设置的,用来接收传入连接的
            .option(ChannelOption.SO_BACKLOG, 128)
            // 是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被**。
            // childOption是用来给父级ServerChannel之下的Channels设置参数的
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            // 绑定端口号
            .bind(8000);

    }

    public static void main(String[] args) {
        try {
            new NettyServer().run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

Netty的Client:

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //客户端配置和启动的类
        Bootstrap bootstrap = new Bootstrap();
        //接受新连接线程,主要负责创建新连接
        EventLoopGroup group = new NioEventLoopGroup();
        //指定使用NioServerSocketChannel产生一个Channel用来接收连接
        bootstrap.group(group).channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) {
                    ch.pipeline().addLast(new StringEncoder());
                }
            });
        //连接服务端
        Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();

        while (true) {
            //写数据
            channel.writeAndFlush(new Date() + ": hello world!");
            Thread.sleep(2000);
        }
    }
}
相关标签: Netty NIO