Netty4实战第六章:ChannelHandler
本章主要内容
- ChannelPipeline
- ChannelHandlerContext
- ChannelHandler
- 收数据与发数据
Netty提供了比较强大的方式处理这部分。它允许用户实现自己的ChannelHandler去处理数据。这样就可以做成很有用ChannelHandler链,并且每个ChannelHandler都可以执行很多任务。这样也可以帮助开发者写出来整洁的,可复用的代码。
不过在ChannelHandler你也只能处理数据。当然,非要较真的话,你也可以在ChannelHandler中阻止IO操作,比如发送数据的场景你可以阻止发送,后面会介绍到这个例子。强大的地方在于这些都可以实时去完成。
一、ChannelPipeline
一个ChannelPipeline中有很多ChannelHandler的实例,用来处理一个Channel中的进出数据。ChannelPipeline提供了先进的过滤器设计模式,用户可以完全控制每个事件发生的操作,以及ChannelPipeline中的每个ChannelHandler之间的相互作用。
对于每一个新的Channel,都会创建一个新的ChannelPipeline然后附加给Channel。一旦附加之后,就会变成永久的联系;Channel不能附加另一个,也不能与当前附加的ChannelPipeline分离。当然这些都是Netty做的事情,开发者无需关心这些。
下图展示了ChannelPipeline的ChannelHandler通常如何处理IO的。IO被ChannelPipeline中的ChannelInboundHandler或ChannelOutboundHandler处理,然后通过ChannelInboundInvoker或ChannelOutboundInvoker接口定义的方法传播到最近的一个同类型的ChannelHandler,一个处理数据接收,一个处理数据发送。而ChannelPipeline继承了他们。
从上图也可以看出,ChannelPipeline很像有多个ChannelHandler的List。数据进来时数据是从ChannelPipeline的头部流向尾部,而数据出去事件也就是发送数据则是从ChannelPipeline尾部流向头部。ChannelPipeline通过检查ChannelHandler的类型确定它处理什么类型事件。如果类型不符合,就会跳过这个ChannelHandler,去找下一个符合。
开发者可以实时去修改ChannelPipeline,也就是你甚至可以在另一个ChannelHandler中增删改其他的ChannelHandler,而且移除自己也是可以的。利用这个可以编写很灵活的代码逻辑,例如多路复用器,后面会详细介绍这个知识点。
我们先来学习如何去修改ChannelPipeline。
名称 |
描述 |
addFirst(…) |
添加一个ChannelHandler在ChannelPipeline头部 |
addBefore(…) |
|
addAfter(…) |
|
addLast(…) |
|
remove(…) |
移除ChannelHandler |
replace(…) |
替换ChannelHandler |
ChannelPipeline pipeline = ..;
FirstHandler firstHandler = new FirstHandler();
//添加一个ChannelHandler
pipeline.addLast("handler1", firstHandler);
//添加到ChannelPipeline的第一个位置
pipeline.addFirst("handler2", new SecondHandler());
//添加到ChannelPipeline的最后一个位置
pipeline.addLast("handler3", new ThirdHandler());
//使用名称移除
pipeline.remove("handler3");
//使用引用移除
pipeline.remove(firstHandler);
//将handler2替换掉
pipeline.replace("handler2", "handler4", new FourthHandler());
可以看到修改ChannelPipeline是很容易的,可以随时根据你的需要增删改查。
在ChannelPipeline中每一个ChannelHandler处理事件都是在IO线程中,也就是你不能在ChannelHandler中阻塞代码,否则就会阻塞IO线程影响整体IO性能。有些情况还必须使用阻塞API,例如JDBC的。遇到这种使用场景可以在调用ChannelPipeline.add的方法时传入EventExecutorGroup。如果传入了自定义的EventExecutorGroup,事件就会被里面的EventExecutor处理,移除也是一样。Netty提供的默认实现名字叫DefaultEventExecutorGroup。
Netty不仅提供了修改ChannelPipeline的方式,而且还提供了访问ChannelPipeline中ChannelHandler的各种方法,这样就可以检查某个指定的ChannelHandler是否存在ChannelPipeline中。
名称 |
描述 |
get(…) |
ChannelPipeline提供了几个get方法,可以获 |
context(…) |
获取ChannelHandlerContext |
contains(…) |
检查是否包含ChannelHandler |
names() |
返回所有ChannelHander的名称 |
iterator() |
返回所有ChannelHander的迭代器 |
ChannelPipeline继承自ChannelInboundInvoker和ChannelOutboundInvoker,它们暴漏的方法分别是用来处理进数据和出数据的。先看一下ChannelPipeline处理进数据时的方法。
名称 |
描述 |
fireChannelRegistered() |
会导致下一个ChannelInboundHandler的channelRegistered方法被调用 |
fireChannelUnregistered() |
会导致下一个ChannelInboundHandler的channelUnregistered方法被调用 |
fireChannelActive() |
会导致下一个ChannelInboundHandler的channelActive方法被调用 |
fireChannelInactive() |
会导致下一个ChannelInboundHandler的channelInactive方法被调用 |
fireExceptionCaught(…) |
会导致下一个ChannelInboundHandler的exceptionCaught方法被调用 |
fireUserEventTriggered(…) |
会导致下一个ChannelInboundHandler的userEventTriggered方法被调用 |
fireChannelRead(….) |
会导致下一个ChannelInboundHandler的channelRead方法被调用 |
fireChannelReadComplete() |
会导致下一个ChannelInboundHandler的channelReadComplete方法被调用 |
这些操作都是来触发ChannelInboundHandler中的方法,所以用来处理各种各样的事件。
不过处理进数据事件只是做了一半的事情,在网络应用中还需要触发和处理出数据事件。
下面表格列出了ChannelOutboundInvoker接口暴露的方法,另外ChannelPipeline,ChannelHandlerContext和Channel都继承了ChannelOutboundInvoker。
名称 |
描述 |
bind(…) |
将Channel绑定到本地,将调用下一个ChannelOutboundHandler的bind方法 |
connect(…) |
将Channel连接到远程地址,将调用下一个ChannelOutboundHandler的connect方法 |
disconnect(…) |
断开Channel连接,将调用下一个ChannelOutboundHandler的disconnect方法 |
close(…) |
关闭Channel,将调用下一个ChannelOutboundHandler的close方法 |
deregister(…) |
注销Channel,将调用下一个ChannelOutboundHandler的deregister方法 |
flush(…) |
刷新Channel中等待的输出,将调用下一个ChannelOutboundHandler的flush方法 |
write() |
写数据到网络,注意,这个方法并不会真正将数据写到实际网络中,只是放到队列,如果 |
writeAndFlush(…) |
快捷方式调用write()和flush() |
read() |
从Channel中读数据,将调用下一个ChannelOutboundHandler的read方法 |
二、ChannelHandlerContext
每当一个ChannelHandler添加到ChannelPipeline时,就会创建和指定一个新的ChannelHandlerContext。ChannelHandlerContext允许ChannelHandler作用其他ChannelHandler,在底层传输结束之前,它们都属于同一个ChannelPipeline。
ChannelHandlerContext不会改变,所以获取它是安全的。
ChannelHandlerContext也实现了ChannelInboundInvoker和ChannelOutboundInvoker。所以很多Channel或ChannelPipeline拥有的方法它也有。不同之处在于当你通过Channel或ChannelPipeline调用这些方法时,要流转整个ChannelPipeline,也就是说里面的每个ChannelHandler都要过一遍。但如果使用ChannelHandlerContext,会从当前位置的ChannelHandler开始处理事件。
2.1、通知下一个ChannelHandler
使用定义在ChannelInboundInvoker和ChannelOutboundInvoker里的一些方法可以通知最近的ChannelHandler。通知从哪里开始取决于你如何设置通知。
下图展示了Channel,ChannelPipeline,ChannelHandler,ChannelHandlerContext之间的关系。
图中ChannelPipeline绑定到Channel,并且拥有多个ChannelHandler实例;而每个ChannelHandler都绑定了一个ChannelHandlerContext,前面说过,一旦添加了一个ChannelHandler就会创建一个新的ChannelHandlerContext。
如果想走完整个ChannelPipeline的流程,目前有两种方式:
- 使用Channel的方法
- 使用ChannelPipeline的方法
下面的代码展示了使用Channel写数据事件流程,它就是从ChannelPipeline尾部开始的。
ChannelHandlerContext ctx = ..;
//通过ChannelHandlerContext获取Channel
Channel channel = ctx.channel();
//写数据
channel.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));
下面的代码展示了如何使用ChannelPipeline写数据。
ChannelHandlerContext ctx = ..;
//通过ChannelHandlerContext获取ChannelPipeline
ChannelPipeline channelPipeline = ctx.pipeline();
//写数据
channelPipeline.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));
ChannelPipeline写数据和使用Channel写数据,数据流转过程都是一样的。另外你也可以看到,通过ChannelHandlerContext可以访问到Channel和ChannelPipeline。使用Channel或ChannelPipeline触发事件通知的流程如下图。
可以看到,事件从ChannelPipeline的头部开始,每次事件通知都是ChannelHandlercontext通知下一个ChannelHandler。
有的业务场景需要在ChannelPipeline中指定位置触发事件,数据不用走过整个ChannelPipeline,例如:
- 保存消息中的额外数据,其他ChannelHandler并不关心这些额外数据
- 排除一些ChannelHandler
ChannelHandlerContext ctx = ..;
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8))
这个数据会从下一个ChannelHandler开始走完ChannelPipeline,事件也会从下一个ChannelHandler开始触发,请看下图。
可以看出,事件从指定的ChannelHandlerContext开始,跳过了之前的所有的ChannelHandler。ChannelHandlerContext是很常用的,大部分情况下使用的是里面的ChannelHandler。
你也可以在外部使用ChannelHandlerContext,因为它是线程安全的。
2.2、修改ChannelPipeline
通过调用ChannelHandler的pipeline()方法,你就可以访问到ChannelPipeline。然后就可以动态实时的去修改ChannelPipeline中的ChannelHandler。前面说过ChannelHandlerContext是线程安全的,所以即使在不同的线程,也可以在ChannelHandler外部使ChannelHandlerContext。
下面的代码展示了如何保存ChannelHandlerContext的引用然后使用,甚至是另一个线程中。
public class WriteHandler extends ChannelHandlerAdapter {
private ChannelHandlerContext ctx;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
//保存ChannelHandlerContext的引用
this.ctx = ctx;
}
public void send(String msg) {
//使用保存的ChannelHandlerContext的引用发送数据
ctx.write(msg);
}
}
ChannelHandler如果使用了@Sharable注解,它的一个实例就可以被添加到多个ChannelPipeline中。这意味着一个ChannelHandler的实例可以有多个ChannelHandlerContext,因此一个ChannelHandler实例可以被不同的ChannelHandlerContext访问。
如果把没有使用@Sharable的ChannelHandler实例添加到多个ChannelPipeline就会抛出异常。另外要注意的是一旦使用@Sharable注解,就要注意ChannelHandler在不同的线程和不同的Channel中的线程安全问题。首先我们来看一下正确使用@Sharable注解的代码。
@Sharable
public class SharableHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//打印消息并执行下一个ChannelHandler的方法
System.out.println("Channel read message" + msg);
ctx.fireChannelRead(msg);
}
}
上面的代码在ChannelHandler实现类中没有使用类属性,这种类可以看作是无状态类,所以是线程安全的。
下面的代码展示的是错误使用@Sharable的例子。
@Sharable
public class NotSharableHandler extends ChannelInboundHandlerAdapter {
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//自增类属性变量
count++;
System.out.println("channelRead(...) called the " + count + " time");
ctx.fireChannelRead(msg);
}
}
上面的代码在一般情况下是没问题的,在类方法中修改类属性。但是在多线程环境中,多个线程同时修改一个实例的属性,就会出现并发问题,在Netty中就是在多个Channel中同时修改一个实例的类属性。
所以当你要使用@Sharable注解的时候,你要确保你的ChannelHandler能同时用于多个Channel中。保证了线程安全问题,多个Channel使用一个ChannelHandler实例,不仅节省创建ChannelHandler的时间开销,还会节省占用的硬件资源,减少垃圾回收时间。总之一句话,@Sharable注解提高性能,节省资源,但是要注意安全的使用。
二、状态模型
Netty有着很强大又简单的状态模型,完美的匹配了ChannelInboundHandler的方法。后面会介绍ChannelInboundHandler。首先我们先来看看这四种状态。
状态 |
描述 |
channelUnregistered |
Channel创建了,但是还没注册到EventLoop上 |
channelRegistered |
Channel注册到EventLoop上 |
channelActive |
Channel**了,意思就是连接到了远程对端, 可以进行收发数据的操作 |
channelInactive |
Channel与远程对端的连接断开了 |
当然,在很多更高级的使用场景中,状态变化就会比上图复杂一些。这是因为Netty允许用户从EventLoop注销Channel用来暂停事件触发,后面也可以重新注册。这种情况会更多的触发channelRegistered和channelUnregistered状态变化。当然在Channel的生命周琪中你也只能看到一次channelActive和hannelInactive,因为一个Channel只能服务与一个连接,如果需要重新连接远程对端,那就需要重新创建一个新的Channel。
下图展示了用户注销Channel然后重新注册的流程图。
四、ChannelHandler的类型
Netty通过ChannelHandler提供了拦截操作或状态变化响应,这就方便开发者很容易编写可复用的逻辑代码。Netty支持的ChannelHandler的类型如下表。
类型 |
描述 |
Inbound Handler |
处理收到数据以及各种状态的变化 |
Outbound Handler |
处理发送数据,并允许各种拦截操作 |
4.1、共同父接口ChannelHandler
Netty使用了一个定义优秀的类型层次结构来代表不同类型的ChannelHandler。他们的父接口就是ChannelHandler。它提供了添加到ChannelPipeline或从ChannelPipeline移除的生命周期操作。如下表。
类型 |
描述 |
handlerAdded(…) |
添加到ChannelPipeline时被调用 |
handlerRemoved(…) |
从ChannelPipeline移除时被调用 |
exceptionCaught(…) |
在ChannelPipeline处理任务时出现异常时被调用 |
ChannelHandlerContext可以安全的存取使用,它属于Channel的局部变量。可以去查看本书ChannelHandlerContext章节获取更多相关知识。
Netty提供了一个ChannelHandler的适配器实现,名字叫ChannelHandlerAdapter。这个就是典型的适配器模式,你可以根据自己的需要再去重写相关方法。它的实现基本上只是将事件传递给ChannelPipeline中的下一个ChannelHandler。
4.2、Inbound ChannelHandler
Inbound ChannelHandler处理的是收到消息事件及状态变化。这一小节我们会介绍多个不同的ChannelHandler子类型,可以用于处理的收到数据的业务逻辑。
CHANNELINBOUNDHANDLER
ChannelInboundHandler提供了用于Channel状态变化或收到数据使用的方法。这些方法匹配的就是上面提到过的Channel状态模型。下表列出了ChannelInboundHandler提供的方法。
方法 |
描述 |
channelRegistered(…) |
Channel注册到EventLoop时执行 |
channelUnregistered(…) |
Channel注销时执行 |
channelActive(…) |
Channel**时执行,意思是连接成功 |
channelInactive(…) |
Channel连接断开时执行 |
channelReadComplete(…) |
读操作完成时执行 |
channelRead(…) |
从缓冲区读到数据时执行 |
userEventTriggered(…) |
用户自定义事件时触发执行 |
ChannelInboundHandler也是ChannelHandler的子类型,所以ChannelInboundHandler也拥有ChannelHandler的方法。
Netty也提供了一个ChannelInboundHandler的适配器,叫ChannelInboundHandlerAdapter。同样,这个适配器也实现了所有方法,方便开发者只需要重写感兴趣的方法。适配器里实现的方法也只是简单的将事件传递给ChannelPipeline中的下一个ChannelInboundHandler。
有一点很重要,我们知道ChannelInboundHandler是用来处理收数据事件,所以重写channelRead(…)方法时要记得释放掉资源。使用池技术优化的ByteBuf的时候这个尤其重要,如果你不释放资源就会引发资源泄露的错误。
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) {
//通过ReferenceCountUtil.release()方法丢弃收到的数据
ReferenceCountUtil.release(msg);
}
}
上面的代码片段展示了如何释放资源。好消息是如果没有释放资源Netty会打印警告日志,所以只要注意看日志就很容易知道什么地方忘记释放资源了。当然,像上面这样手动释放资源是很繁琐的,所以Netty提供了一个SimpleChannelInboundHandler帮我们解决了这个问题。使用这个类你就不需要关系资源的释放问题了。不过,它也有一个很重要的问题要记住,SimpleChannelInboundHandler处理完数据就会释放掉,所以你不能存储收到数据的引用后面还去使用。下面我们来看看使用SimpleChannelInboundHandler的代码。
@Sharable
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
Object msg) {
//这里处理完数据不需要手动释放
}
}
当然,如果你想收到其他状态改变的通知,就可以重写那些方法。
很多情况你都需要解码收到的字节数据转为自己的类型,你可能会选择实现ChannelInboundHandler或继承ChannelInboundhandlerAdapter。不过Netty提供了更好的方式解决这个问题,使用它的编码解码框架很容易满足这个需求,后面章节会详细介绍。现在的重点还是学习ChannelHandler。
如果你确实需要使用ChannelInboundHandler,ChannelInboundHandlerAdapter或SimpleChannelInboundhandler,大部分情况下处理数据使用SimpleChannelInboundHandler好一些,而处理收消息状态变化使用ChannelInboundHandlerAdapter好一些。
收数据和引用计数
不知道你是否还记得前面说过Netty使用引用计数的方式处理池技术优化的ByteBuf。所以在处理完ByteBuf后调整它的引用计数是很重要的事情。
这对于理解ChannelOutboundHandlerAdapter和SimpleChannelInboundHandler的不同点也很重要。ChannelInboundHandlerAdapter收到数据后会触发channelRead(...)方法,但是后面不会释放资源,所以用户在重写这个方法时要手动释放资源。而SimpleChannelInboundHandler就不同了,channelRead(...)方法执行后会自动释放资源,因此你的代码要么消费掉消息,要么使用retain()方法等方法返回后继续使用。
没有正确释放资源就是引发资源泄漏,不过Netty会输出警告级别日志告诉开发者哪里漏掉释放资源代码。
CHANNELINITIALIZER
有一个稍微有点修改的ChannelInboundHandler值得我们了解一下:ChannelInitializer。它的名字已经很精确的表达了它的作用,所以看来Netty团队起名字都是很用心的。当Channel注册到EventLoop并且准备处理IO的时候,就可以使用ChannelInitializer初始化Channel。
ChannelInitializer主要使用场景就是用来设置Channel的ChannelPipeline,例如添加ChannelHandler,前面的章节已经介绍过这部分内容。这里,我们只需要知道它也是一个ChannelInboundHandler。
4.3 Outbound handlers
上面的章节介绍的是收到数据时的ChannelHandler,现在该学习发送数据时的ChannelHandler了。
CHANNELOUTBOUNDHANDLER
ChannelOutboundHandler提供了发送操作的方法。那些方法都列在了ChannelOutboundInvoker接口中,Channel,
ChannelPipeline和ChannelHandlerContext都继承了ChannelOutboundInvoker接口。
ChannelOutboundHandler有很多强大的方法,可以按要求延迟操作。它有很多强大灵活的方式处理请求。例如,当没有数据要写给远程对端的时候你可以延迟刷新操作,后面需要使用的时候再使用。
下表列出了它提供的方法。
方法 |
描述 |
bind(…) |
请求Channel绑定到本地时执行 |
connect(…) |
请求Channel连接到远程时执行 |
disconnect(…) |
请求Channel断开远程连接时执行 |
close(…) |
请求Channel关闭时执行 |
deregister(…) |
请求注销Channel时执行 |
read(…) |
从Channel读数据 |
flush(…) |
刷新缓冲区数据到远程对端 |
write(…) |
写数据到远程对端 |
上面这些方法都有一个ChannelPromise参数,如果不想继续通过ChannelPipeline的流程一定要使用它来停止。
Netty也提供了一个ChannelOutboundHandler的适配器类ChannelOutboundHandlerAdapter。这个适配器也只是最基础的实现,你可以继承它然后重写你感兴趣的方法。同样,这个适配器也只是将事件传给下一个ChannelHandler,使用的也是ChannelHandlerContext的方法。
和ChannelInboundHandler一样,这里如果你使用写操作,然后也要负责释放资源。代码片段如下。
@Sharable
public class DiscardOutboundHandler
extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx,
Object msg, ChannelPromise promise) {
//释放资源
ReferenceCountUtil.release(msg);
//通知ChannelPromise数据已经处理
promise.setSuccess();
}
}
一定要记得释放资源并通知ChannelPromise。如果不通知ChannelPromise,可能会导致收到消息事件时ChannelFutureListener不会被通知。
发送消息处理及计数引用
如果消息已经处理并且不打算传给下一个ChannelOutboundHandler,那么用户就需要调用ReferenceCountUtil.release()方法释放消息。一旦消息被传到实际网络中,通过刷缓冲区操作或Channel关闭就自动释放资源了
上面的简单例子,帮助我们了解ChannelOutboundHandler以及ChannelOutboundHandlerAdapter,它们提供的功能帮助我们更简单有效的使用Netty。
五、总结
这一章我们主要学习了ChannelHandler及其实现,它们就是Netty提供给我们处理数据的工具。还学习的ChannelHandler链以及ChannelPipeline如何使用它们。
然后比较了收到数据和发送数据的ChannelHandler的不同,以及处理字节消息和其他各种类型消息的不同。
下一章我们主要学习Netty的解码器,它比ChannelHandler更容易编写出适合自己的解码器。另外也会介绍如何更容易的测试我们实现的ChannelHandler。
推荐阅读
-
Netty4实战第六章:ChannelHandler
-
实战SpringCloud响应式微服务系列教程(第六章)
-
.NET Core IdentityServer4实战 第六章-Consent授权页
-
【.NET Core项目实战-统一认证平台】第六章 网关篇-自定义客户端授权
-
R语言实战学习笔记-第六章基本图形
-
第六章 Web开发实战1——HTTP服务
-
jQuery 实战读书笔记之第六章:事件本质
-
.NET Core实战项目之CMS 第六章 入门篇-Vue的快速入门及其使用
-
.NET Core IdentityServer4实战 第六章-Consent授权页
-
实战SpringCloud响应式微服务系列教程(第六章)