Netty入门demo
Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
Netty 作为异步事件驱动的网络,高性能之处主要来自于其 I/O 模型和线程处理模型,前者决定如何收发数据,后者决定如何处理数据。
1、I/O 模型
NIO
Netty 的 IO 线程 NioEventLoop 由于聚合了多路复用器 Selector,可以同时并发处理成百上千个客户端连接。当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
基于 Buffer
传统的 I/O 是面向字节流或字符流的,以流式的方式顺序地从一个 Stream 中读取一个或多个字节, 因此也就不能随意改变读取指针的位置。在 NIO 中,抛弃了传统的 I/O 流,而是引入了 Channel 和 Buffer 的概念。在 NIO 中,只能从 Channel 中读取数据到 Buffer 中或将数据从 Buffer 中写入到 Channel。基于 Buffer 操作不像传统 IO 的顺序操作,NIO 中可以随意地读取任意位置的数据。
2、线程处理模型
通常,我们设计一个事件处理模型的程序有两种思路:
轮询方式:线程不断轮询访问相关事件发生源有没有发生事件,有发生事件就调用事件处理逻辑。
事件驱动方式:发生事件,主线程把事件放入事件队列,在另外线程不断循环消费事件列表中的事件,调用事件对应的处理逻辑处理事件。事件驱动方式也被称为消息通知方式,其实是设计模式中观察者模式的思路。
以 GUI 的逻辑处理为例,说明两种逻辑的不同:
轮询方式,线程不断轮询是否发生按钮点击事件,如果发生,调用处理逻辑。
事件驱动方式,发生点击事件把事件放入事件队列,在另外线程消费的事件列表中的事件,根据事件类型调用相关事件处理逻辑。
主要包括 4 个基本组件:
- 事件队列(event queue):接收事件的入口,存储待处理事件。
- 分发器(event mediator):将不同的事件分发到不同的业务逻辑单元。
- 事件通道(event channel):分发器与处理器之间的联系渠道。
- 事件处理器(event processor):实现业务逻辑,处理完成后会发出事件,触发下一步操作。
可以看出,相对传统轮询模式,事件驱动有如下优点:
- 可扩展性好,分布式的异步架构,事件处理器之间高度解耦,可以方便扩展事件处理逻辑。
- 高性能,基于队列暂存事件,能方便并行异步处理事件。
模块组件
Bootstrap、ServerBootstrap
Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。
Future、ChannelFuture
正如前面介绍,在 Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。
Channel
Netty 网络通信的组件,能够用于执行网络 I/O 操作。Channel 为用户提供:
- 当前网络连接的通道的状态(例如是否打开?是否已连接?)
- 网络连接的配置参数 (例如接收缓冲区大小)
- 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的I/O 操作已完成。
- 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O
操作成功、失败或取消时回调通知调用方。 - 支持关联 I/O 操作与对应的处理程序。
不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应。下面是一些常用的 Channel 类型:
NioSocketChannel,异步的客户端 TCP Socket 连接。
NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
NioDatagramChannel,异步的 UDP 连接。
NioSctpChannel,异步的客户端 Sctp 连接。
NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。
Selector
Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。
NioEventLoop
NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:
I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。
非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。
两种任务的执行时间比由变量 ioRatio 控制,默认为 50,则表示允许非 IO 任务执行的时间与 IO 任务的执行时间相等。
NioEventLoopGroup
NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程。
ChannelHandler
ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:
ChannelInboundHandler 用于处理入站 I/O 事件。
ChannelOutboundHandler 用于处理出站 I/O 操作。
或者使用以下适配器类:
ChannelInboundHandlerAdapter 用于处理入站 I/O 事件。
ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作。
ChannelDuplexHandler 用于处理入站和出站事件。
ChannelHandlerContext
保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。
Netty的Server:
public class NettyServer {
public void run() throws Exception{
// ServerBootstrap负责建立服务端,可以直接使用Channel去建立服务端但是一般使用ServerBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
// Event:事件,Loop:循环,Group:群组
// Netty内部都是通过线程在处理各种数据,EventLoopGroup就是用来管理调度他们的,注册Channel,管理他们的生命周期。
// NioEventLoopGroup是一个处理I/O操作的多线程事件循环
// boos接收传入连接,因为boos仅接收客户端连接,不做复杂的逻辑处理,为了尽可能减少资源的占用,取值越小越好
EventLoopGroup boos = new NioEventLoopGroup();
// 处理boss接收的连接的流量和将接收的连接注册进入这个worker
EventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap.group(boos, worker)
// 指定使用NioServerSocketChannel产生一个Channel用来接收连接
.channel(NioServerSocketChannel.class)
// ChannelInitializer用于配置一个新的Channel
// 用于向你的Channel当中添加ChannelInboundHandler的实现
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// ChannelPipeline用于存放管理ChannelHandel
// ChannelHandler用于处理请求响应的业务逻辑相关代码
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
})
// 以下是socket的标准参数
// BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握
// 手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
// Option是为了NioServerSocketChannel设置的,用来接收传入连接的
.option(ChannelOption.SO_BACKLOG, 128)
// 是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被**。
// childOption是用来给父级ServerChannel之下的Channels设置参数的
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 绑定端口号
.bind(8000);
}
public static void main(String[] args) {
try {
new NettyServer().run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Netty的Client:
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端配置和启动的类
Bootstrap bootstrap = new Bootstrap();
//接受新连接线程,主要负责创建新连接
EventLoopGroup group = new NioEventLoopGroup();
//指定使用NioServerSocketChannel产生一个Channel用来接收连接
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
});
//连接服务端
Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();
while (true) {
//写数据
channel.writeAndFlush(new Date() + ": hello world!");
Thread.sleep(2000);
}
}
}
上一篇: netty入门demo(一)
下一篇: quartz的一个demo