欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty4实战第六章:ChannelHandler

程序员文章站 2024-03-09 14:18:35
...

本章主要内容

  • ChannelPipeline
  • ChannelHandlerContext
  • ChannelHandler
  • 收数据与发数据
  接收连接或创建连接只是网络应用的一部分。虽然这一部分是很重要的,不过还有另一部分不仅更重要更复杂,还需要编写大量代码。这就是处理进出数据的部分。
  Netty提供了比较强大的方式处理这部分。它允许用户实现自己的ChannelHandler去处理数据。这样就可以做成很有用ChannelHandler链,并且每个ChannelHandler都可以执行很多任务。这样也可以帮助开发者写出来整洁的,可复用的代码。
  不过在ChannelHandler你也只能处理数据。当然,非要较真的话,你也可以在ChannelHandler中阻止IO操作,比如发送数据的场景你可以阻止发送,后面会介绍到这个例子。强大的地方在于这些都可以实时去完成。

一、ChannelPipeline

  一个ChannelPipeline中有很多ChannelHandler的实例,用来处理一个Channel中的进出数据。ChannelPipeline提供了先进的过滤器设计模式,用户可以完全控制每个事件发生的操作,以及ChannelPipeline中的每个ChannelHandler之间的相互作用。
  对于每一个新的Channel,都会创建一个新的ChannelPipeline然后附加给Channel。一旦附加之后,就会变成永久的联系;Channel不能附加另一个,也不能与当前附加的ChannelPipeline分离。当然这些都是Netty做的事情,开发者无需关心这些。
  下图展示了ChannelPipeline的ChannelHandler通常如何处理IO的。IO被ChannelPipeline中的ChannelInboundHandler或ChannelOutboundHandler处理,然后通过ChannelInboundInvoker或ChannelOutboundInvoker接口定义的方法传播到最近的一个同类型的ChannelHandler,一个处理数据接收,一个处理数据发送。而ChannelPipeline继承了他们。Netty4实战第六章:ChannelHandler

  从上图也可以看出,ChannelPipeline很像有多个ChannelHandler的List。数据进来时数据是从ChannelPipeline的头部流向尾部,而数据出去事件也就是发送数据则是从ChannelPipeline尾部流向头部。ChannelPipeline通过检查ChannelHandler的类型确定它处理什么类型事件。如果类型不符合,就会跳过这个ChannelHandler,去找下一个符合。

  开发者可以实时去修改ChannelPipeline,也就是你甚至可以在另一个ChannelHandler中增删改其他的ChannelHandler,而且移除自己也是可以的。利用这个可以编写很灵活的代码逻辑,例如多路复用器,后面会详细介绍这个知识点。

  我们先来学习如何去修改ChannelPipeline。

名称

描述

addFirst(…)

添加一个ChannelHandler在ChannelPipeline头部

addBefore(…)


addAfter(…)


addLast(…)


remove(…)

移除ChannelHandler

replace(…)

替换ChannelHandler

    ChannelPipeline pipeline = ..;
    FirstHandler firstHandler = new FirstHandler();
    //添加一个ChannelHandler
    pipeline.addLast("handler1", firstHandler);
    //添加到ChannelPipeline的第一个位置
    pipeline.addFirst("handler2", new SecondHandler());
    //添加到ChannelPipeline的最后一个位置
    pipeline.addLast("handler3", new ThirdHandler());
    //使用名称移除
    pipeline.remove("handler3");
    //使用引用移除
    pipeline.remove(firstHandler);
    //将handler2替换掉
    pipeline.replace("handler2", "handler4", new FourthHandler());

  可以看到修改ChannelPipeline是很容易的,可以随时根据你的需要增删改查。

  在ChannelPipeline中每一个ChannelHandler处理事件都是在IO线程中,也就是你不能在ChannelHandler中阻塞代码,否则就会阻塞IO线程影响整体IO性能。有些情况还必须使用阻塞API,例如JDBC的。遇到这种使用场景可以在调用ChannelPipeline.add的方法时传入EventExecutorGroup。如果传入了自定义的EventExecutorGroup,事件就会被里面的EventExecutor处理,移除也是一样。Netty提供的默认实现名字叫DefaultEventExecutorGroup。

  Netty不仅提供了修改ChannelPipeline的方式,而且还提供了访问ChannelPipeline中ChannelHandler的各种方法,这样就可以检查某个指定的ChannelHandler是否存在ChannelPipeline中。

名称

描述

get(…)

ChannelPipeline提供了几个get方法,可以获
ChannelHandler和ChannelHandlerContext

context(…)

获取ChannelHandlerContext

contains(…)

检查是否包含ChannelHandler

names()

返回所有ChannelHander的名称

iterator()

返回所有ChannelHander的迭代器

  ChannelPipeline继承自ChannelInboundInvoker和ChannelOutboundInvoker,它们暴漏的方法分别是用来处理进数据和出数据的。先看一下ChannelPipeline处理进数据时的方法。

名称

描述

fireChannelRegistered()

会导致下一个ChannelInboundHandler的channelRegistered方法被调用

fireChannelUnregistered()

会导致下一个ChannelInboundHandler的channelUnregistered方法被调用

fireChannelActive()

会导致下一个ChannelInboundHandler的channelActive方法被调用

fireChannelInactive()

会导致下一个ChannelInboundHandler的channelInactive方法被调用

fireExceptionCaught(…)

会导致下一个ChannelInboundHandler的exceptionCaught方法被调用

fireUserEventTriggered(…)

会导致下一个ChannelInboundHandler的userEventTriggered方法被调用

fireChannelRead(….)

会导致下一个ChannelInboundHandler的channelRead方法被调用

fireChannelReadComplete()

会导致下一个ChannelInboundHandler的channelReadComplete方法被调用

  这些操作都是来触发ChannelInboundHandler中的方法,所以用来处理各种各样的事件。

  不过处理进数据事件只是做了一半的事情,在网络应用中还需要触发和处理出数据事件。
  下面表格列出了ChannelOutboundInvoker接口暴露的方法,另外ChannelPipeline,ChannelHandlerContext和Channel都继承了ChannelOutboundInvoker。

名称

描述

bind(…)

将Channel绑定到本地,将调用下一个ChannelOutboundHandler的bind方法

connect(…)

Channel连接到远程地址,将调用下一个ChannelOutboundHandler的connect方法

disconnect(…)

断开Channel连接,将调用下一个ChannelOutboundHandler的disconnect方法

close(…)

关闭Channel,将调用下一个ChannelOutboundHandler的close方法

deregister(…)

注销Channel,将调用下一个ChannelOutboundHandler的deregister方法

flush(…)

刷新Channel中等待的输出,将调用下一个ChannelOutboundHandler的flush方法

write()

写数据到网络,注意,这个方法并不会真正将数据写到实际网络中,只是放到队列,如果
想写到实际网络,需要调用flush方法或者使用writeAndFlush方法

writeAndFlush(…)

快捷方式调用write()和flush()

read()

Channel中读数据,将调用下一个ChannelOutboundHandler的read方法


二、ChannelHandlerContext

  每当一个ChannelHandler添加到ChannelPipeline时,就会创建和指定一个新的ChannelHandlerContext。ChannelHandlerContext允许ChannelHandler作用其他ChannelHandler,在底层传输结束之前,它们都属于同一个ChannelPipeline。

  ChannelHandlerContext不会改变,所以获取它是安全的。

  ChannelHandlerContext也实现了ChannelInboundInvoker和ChannelOutboundInvoker。所以很多Channel或ChannelPipeline拥有的方法它也有。不同之处在于当你通过Channel或ChannelPipeline调用这些方法时,要流转整个ChannelPipeline,也就是说里面的每个ChannelHandler都要过一遍。但如果使用ChannelHandlerContext,会从当前位置的ChannelHandler开始处理事件。

2.1、通知下一个ChannelHandler

  使用定义在ChannelInboundInvoker和ChannelOutboundInvoker里的一些方法可以通知最近的ChannelHandler。通知从哪里开始取决于你如何设置通知。

  下图展示了Channel,ChannelPipeline,ChannelHandler,ChannelHandlerContext之间的关系。

Netty4实战第六章:ChannelHandler

  图中ChannelPipeline绑定到Channel,并且拥有多个ChannelHandler实例;而每个ChannelHandler都绑定了一个ChannelHandlerContext,前面说过,一旦添加了一个ChannelHandler就会创建一个新的ChannelHandlerContext。

  如果想走完整个ChannelPipeline的流程,目前有两种方式:

  • 使用Channel的方法
  • 使用ChannelPipeline的方法
  这些方法都会走完整个ChannelPipeline流程,也就是会触发ChannelPipeline中所有的ChannelHandler。前面说过,进数据事件和出数据事件在ChannelPipeline中流程是不完全一样的,一个是从头开始,一个是从尾开始。
  下面的代码展示了使用Channel写数据事件流程,它就是从ChannelPipeline尾部开始的。

        ChannelHandlerContext ctx = ..;
        //通过ChannelHandlerContext获取Channel
        Channel channel = ctx.channel();
        //写数据
        channel.write(Unpooled.copiedBuffer("Netty in Action",
                CharsetUtil.UTF_8));

  下面的代码展示了如何使用ChannelPipeline写数据。

        ChannelHandlerContext ctx = ..;
        //通过ChannelHandlerContext获取ChannelPipeline
        ChannelPipeline channelPipeline = ctx.pipeline();
        //写数据
        channelPipeline.write(Unpooled.copiedBuffer("Netty in Action",
                CharsetUtil.UTF_8));

  ChannelPipeline写数据和使用Channel写数据,数据流转过程都是一样的。另外你也可以看到,通过ChannelHandlerContext可以访问到Channel和ChannelPipeline。使用Channel或ChannelPipeline触发事件通知的流程如下图。

Netty4实战第六章:ChannelHandler

  可以看到,事件从ChannelPipeline的头部开始,每次事件通知都是ChannelHandlercontext通知下一个ChannelHandler。
  有的业务场景需要在ChannelPipeline中指定位置触发事件,数据不用走过整个ChannelPipeline,例如:

  • 保存消息中的额外数据,其他ChannelHandler并不关心这些额外数据
  • 排除一些ChannelHandler
  上面这些情况你就可以考虑使用ChannelHandlerContext。不过要注意的是,ChannelHandlerContext执行的其实是下一个ChannelHandler,而不是ChannelHandlerContext所绑定的ChannelHandler。下面的代码展示了如何使用ChannelHandlerContext写数据的。

ChannelHandlerContext ctx = ..;
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8))

  这个数据会从下一个ChannelHandler开始走完ChannelPipeline,事件也会从下一个ChannelHandler开始触发,请看下图。

Netty4实战第六章:ChannelHandler

  可以看出,事件从指定的ChannelHandlerContext开始,跳过了之前的所有的ChannelHandler。ChannelHandlerContext是很常用的,大部分情况下使用的是里面的ChannelHandler。
  你也可以在外部使用ChannelHandlerContext,因为它是线程安全的。

2.2、修改ChannelPipeline

  通过调用ChannelHandler的pipeline()方法,你就可以访问到ChannelPipeline。然后就可以动态实时的去修改ChannelPipeline中的ChannelHandler。前面说过ChannelHandlerContext是线程安全的,所以即使在不同的线程,也可以在ChannelHandler外部使ChannelHandlerContext。

  下面的代码展示了如何保存ChannelHandlerContext的引用然后使用,甚至是另一个线程中。

        public class WriteHandler extends ChannelHandlerAdapter {

            private ChannelHandlerContext ctx;

            @Override
            public void handlerAdded(ChannelHandlerContext ctx) {
                //保存ChannelHandlerContext的引用
                this.ctx = ctx;
            }
            public void send(String msg) {
                //使用保存的ChannelHandlerContext的引用发送数据
                ctx.write(msg);
            }
        }

  ChannelHandler如果使用了@Sharable注解,它的一个实例就可以被添加到多个ChannelPipeline中。这意味着一个ChannelHandler的实例可以有多个ChannelHandlerContext,因此一个ChannelHandler实例可以被不同的ChannelHandlerContext访问。

  如果把没有使用@Sharable的ChannelHandler实例添加到多个ChannelPipeline就会抛出异常。另外要注意的是一旦使用@Sharable注解,就要注意ChannelHandler在不同的线程和不同的Channel中的线程安全问题。首先我们来看一下正确使用@Sharable注解的代码。

        @Sharable
        public class SharableHandler extends ChannelInboundHandlerAdapter {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                //打印消息并执行下一个ChannelHandler的方法
                System.out.println("Channel read message" + msg);
                ctx.fireChannelRead(msg);
            }
        }

  上面的代码在ChannelHandler实现类中没有使用类属性,这种类可以看作是无状态类,所以是线程安全的。

  下面的代码展示的是错误使用@Sharable的例子。

        @Sharable
        public class NotSharableHandler extends ChannelInboundHandlerAdapter {

            private int count;

            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                //自增类属性变量
                count++;
                
                System.out.println("channelRead(...) called the "  + count + " time");
                ctx.fireChannelRead(msg);
            }
        }

  上面的代码在一般情况下是没问题的,在类方法中修改类属性。但是在多线程环境中,多个线程同时修改一个实例的属性,就会出现并发问题,在Netty中就是在多个Channel中同时修改一个实例的类属性。

  所以当你要使用@Sharable注解的时候,你要确保你的ChannelHandler能同时用于多个Channel中。保证了线程安全问题,多个Channel使用一个ChannelHandler实例,不仅节省创建ChannelHandler的时间开销,还会节省占用的硬件资源,减少垃圾回收时间。总之一句话,@Sharable注解提高性能,节省资源,但是要注意安全的使用。

二、状态模型

  Netty有着很强大又简单的状态模型,完美的匹配了ChannelInboundHandler的方法。后面会介绍ChannelInboundHandler。首先我们先来看看这四种状态。

状态

描述

channelUnregistered

Channel创建了,但是还没注册到EventLoop上

channelRegistered

Channel注册到EventLoop上

channelActive

Channel**了,意思就是连接到了远程对端,

可以进行收发数据的操作

channelInactive

Channel与远程对端的连接断开了

  上面Channel的状态其实就是它的整个生命周期,触发状态变化。一般四个状态变化的周期如下图所示。

Netty4实战第六章:ChannelHandler

  当然,在很多更高级的使用场景中,状态变化就会比上图复杂一些。这是因为Netty允许用户从EventLoop注销Channel用来暂停事件触发,后面也可以重新注册。这种情况会更多的触发channelRegistered和channelUnregistered状态变化。当然在Channel的生命周琪中你也只能看到一次channelActive和hannelInactive,因为一个Channel只能服务与一个连接,如果需要重新连接远程对端,那就需要重新创建一个新的Channel。

  下图展示了用户注销Channel然后重新注册的流程图。

Netty4实战第六章:ChannelHandler

四、ChannelHandler的类型

  Netty通过ChannelHandler提供了拦截操作或状态变化响应,这就方便开发者很容易编写可复用的逻辑代码。Netty支持的ChannelHandler的类型如下表。

类型

描述

Inbound Handler

处理收到数据以及各种状态的变化

Outbound Handler

处理发送数据,并允许各种拦截操作

  每个类型我们都需要学习,首先我们先来看看它们的基接口。

4.1、共同父接口ChannelHandler

  Netty使用了一个定义优秀的类型层次结构来代表不同类型的ChannelHandler。他们的父接口就是ChannelHandler。它提供了添加到ChannelPipeline或从ChannelPipeline移除的生命周期操作。如下表。

类型

描述

handlerAdded(…)

添加到ChannelPipeline时被调用

handlerRemoved(…)

从ChannelPipeline移除时被调用

exceptionCaught(…)

ChannelPipeline处理任务时出现异常时被调用

  上面这些方式都会有一个ChannelHandlerContext参数传入。前面说过,ChannelHandler添加到ChannelPipeline时会自动创建ChannelHandlerContext。ChannelHandlerContext会绑定到ChannelHandler,ChannelPipeline和Channel。

  ChannelHandlerContext可以安全的存取使用,它属于Channel的局部变量。可以去查看本书ChannelHandlerContext章节获取更多相关知识。

  Netty提供了一个ChannelHandler的适配器实现,名字叫ChannelHandlerAdapter。这个就是典型的适配器模式,你可以根据自己的需要再去重写相关方法。它的实现基本上只是将事件传递给ChannelPipeline中的下一个ChannelHandler。

4.2、Inbound ChannelHandler

  Inbound ChannelHandler处理的是收到消息事件及状态变化。这一小节我们会介绍多个不同的ChannelHandler子类型,可以用于处理的收到数据的业务逻辑。

CHANNELINBOUNDHANDLER

  ChannelInboundHandler提供了用于Channel状态变化或收到数据使用的方法。这些方法匹配的就是上面提到过的Channel状态模型。下表列出了ChannelInboundHandler提供的方法。

方法

描述

channelRegistered(…)

Channel注册到EventLoop时执行

channelUnregistered(…)

Channel注销时执行

channelActive(…)

Channel**时执行,意思是连接成功

channelInactive(…)

Channel连接断开时执行

channelReadComplete(…)

读操作完成时执行

channelRead(…)

从缓冲区读到数据时执行

userEventTriggered(…)

用户自定义事件时触发执行

  这些方法都在ChannelInboundInvoker中有对应的方法,并且ChannelHandlerContext和ChannelPipeline都继承了ChannelInboundInvoker。

  ChannelInboundHandler也是ChannelHandler的子类型,所以ChannelInboundHandler也拥有ChannelHandler的方法。

  Netty也提供了一个ChannelInboundHandler的适配器,叫ChannelInboundHandlerAdapter。同样,这个适配器也实现了所有方法,方便开发者只需要重写感兴趣的方法。适配器里实现的方法也只是简单的将事件传递给ChannelPipeline中的下一个ChannelInboundHandler。

  有一点很重要,我们知道ChannelInboundHandler是用来处理收数据事件,所以重写channelRead(…)方法时要记得释放掉资源。使用池技术优化的ByteBuf的时候这个尤其重要,如果你不释放资源就会引发资源泄露的错误。

        @Sharable
        public class DiscardHandler extends ChannelInboundHandlerAdapter {
            @Override
            public void channelRead(ChannelHandlerContext ctx,
                                    Object msg) {
                //通过ReferenceCountUtil.release()方法丢弃收到的数据
                ReferenceCountUtil.release(msg);
            }
        }
  上面的代码片段展示了如何释放资源。好消息是如果没有释放资源Netty会打印警告日志,所以只要注意看日志就很容易知道什么地方忘记释放资源了。
  当然,像上面这样手动释放资源是很繁琐的,所以Netty提供了一个SimpleChannelInboundHandler帮我们解决了这个问题。使用这个类你就不需要关系资源的释放问题了。不过,它也有一个很重要的问题要记住,SimpleChannelInboundHandler处理完数据就会释放掉,所以你不能存储收到数据的引用后面还去使用。下面我们来看看使用SimpleChannelInboundHandler的代码。

        @Sharable
        public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {
            @Override
            public void channelRead0(ChannelHandlerContext ctx,
                                     Object msg) {
                //这里处理完数据不需要手动释放
            }
        }
  当然,如果你想收到其他状态改变的通知,就可以重写那些方法。

  很多情况你都需要解码收到的字节数据转为自己的类型,你可能会选择实现ChannelInboundHandler或继承ChannelInboundhandlerAdapter。不过Netty提供了更好的方式解决这个问题,使用它的编码解码框架很容易满足这个需求,后面章节会详细介绍。现在的重点还是学习ChannelHandler。

  如果你确实需要使用ChannelInboundHandler,ChannelInboundHandlerAdapter或SimpleChannelInboundhandler,大部分情况下处理数据使用SimpleChannelInboundHandler好一些,而处理收消息状态变化使用ChannelInboundHandlerAdapter好一些。


收数据和引用计数

  不知道你是否还记得前面说过Netty使用引用计数的方式处理池技术优化的ByteBuf。所以在处理完ByteBuf后调整它的引用计数是很重要的事情。

  这对于理解ChannelOutboundHandlerAdapter和SimpleChannelInboundHandler的不同点也很重要。ChannelInboundHandlerAdapter收到数据后会触发channelRead(...)方法,但是后面不会释放资源,所以用户在重写这个方法时要手动释放资源。而SimpleChannelInboundHandler就不同了,channelRead(...)方法执行后会自动释放资源,因此你的代码要么消费掉消息,要么使用retain()方法等方法返回后继续使用。

  没有正确释放资源就是引发资源泄漏,不过Netty会输出警告级别日志告诉开发者哪里漏掉释放资源代码。

CHANNELINITIALIZER

  有一个稍微有点修改的ChannelInboundHandler值得我们了解一下:ChannelInitializer。它的名字已经很精确的表达了它的作用,所以看来Netty团队起名字都是很用心的。当Channel注册到EventLoop并且准备处理IO的时候,就可以使用ChannelInitializer初始化Channel。

  ChannelInitializer主要使用场景就是用来设置Channel的ChannelPipeline,例如添加ChannelHandler,前面的章节已经介绍过这部分内容。这里,我们只需要知道它也是一个ChannelInboundHandler。

4.3 Outbound handlers

  上面的章节介绍的是收到数据时的ChannelHandler,现在该学习发送数据时的ChannelHandler了。

CHANNELOUTBOUNDHANDLER

  ChannelOutboundHandler提供了发送操作的方法。那些方法都列在了ChannelOutboundInvoker接口中,Channel, ChannelPipeline和ChannelHandlerContext都继承了ChannelOutboundInvoker接口。

  ChannelOutboundHandler有很多强大的方法,可以按要求延迟操作。它有很多强大灵活的方式处理请求。例如,当没有数据要写给远程对端的时候你可以延迟刷新操作,后面需要使用的时候再使用。

  下表列出了它提供的方法。

方法

描述

bind(…)

请求Channel绑定到本地时执行

connect(…)

请求Channel连接到远程时执行

disconnect(…)

请求Channel断开远程连接时执行

close(…)

请求Channel关闭时执行

deregister(…)

请求注销Channel时执行

read(…)

Channel读数据

flush(…)

刷新缓冲区数据到远程对端

write(…)

写数据到远程对端

  ChannelOutboundHandler也是ChannelHandler的子类,所以它也有ChannelHandler的所有方法。

  上面这些方法都有一个ChannelPromise参数,如果不想继续通过ChannelPipeline的流程一定要使用它来停止。

  Netty也提供了一个ChannelOutboundHandler的适配器类ChannelOutboundHandlerAdapter。这个适配器也只是最基础的实现,你可以继承它然后重写你感兴趣的方法。同样,这个适配器也只是将事件传给下一个ChannelHandler,使用的也是ChannelHandlerContext的方法。

  和ChannelInboundHandler一样,这里如果你使用写操作,然后也要负责释放资源。代码片段如下。

        @Sharable
        public class DiscardOutboundHandler
                extends ChannelOutboundHandlerAdapter {
            @Override
            public void write(ChannelHandlerContext ctx,
                              Object msg, ChannelPromise promise) {
                //释放资源
                ReferenceCountUtil.release(msg);
                //通知ChannelPromise数据已经处理
                promise.setSuccess();
            }
        }
  一定要记得释放资源并通知ChannelPromise。如果不通知ChannelPromise,可能会导致收到消息事件时ChannelFutureListener不会被通知。

发送消息处理及计数引用

  如果消息已经处理并且不打算传给下一个ChannelOutboundHandler,那么用户就需要调用ReferenceCountUtil.release()方法释放消息。一旦消息被传到实际网络中,通过刷缓冲区操作或Channel关闭就自动释放资源了

  上面的简单例子,帮助我们了解ChannelOutboundHandler以及ChannelOutboundHandlerAdapter,它们提供的功能帮助我们更简单有效的使用Netty。

五、总结

  这一章我们主要学习了ChannelHandler及其实现,它们就是Netty提供给我们处理数据的工具。还学习的ChannelHandler链以及ChannelPipeline如何使用它们。
  然后比较了收到数据和发送数据的ChannelHandler的不同,以及处理字节消息和其他各种类型消息的不同。
  下一章我们主要学习Netty的解码器,它比ChannelHandler更容易编写出适合自己的解码器。另外也会介绍如何更容易的测试我们实现的ChannelHandler。


  





    

  

  




  

  

  

相关标签: netty