欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

【Netty】原理分析:ChannelHandler

程序员文章站 2022-06-18 18:27:19
ChannelPipelineChannel 就跟 BIO 中流(Stream) 一样是用来传输数据的,每一个客户端都会有一个 Channel;而每个 Channel 都包含了一个管道 ChannelPipeline;ChannelPipeline 提供了 ChannelHandler 的容器,可以理解成一个双向链表。public class NettyServerInitailizer extends ChannelInitializer { @Ov...

首先,我们要明白 Channel、ChannelHandler、ChannelPipe 这几个关键组件的关系。

Channel 就跟 BIO 中流(Stream) 一样是用来传输数据的,每一个客户端都会有一个 Channel;而每个 Channel 都包含了一个管道 ChannelPipeline;ChannelPipeline 提供了 ChannelHandler 的容器,可以理解成一个双向链表 -- 责任链模式

【Netty】原理分析:ChannelHandler
最直接能看出三者关系的就是初始化时的 ChannelInitializer,来看下面示这段例代码:

public class NettyServerInitailizer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) {
    	// 获取 Channel 中的管道 ChannelPipeline 
        ChannelPipeline pipeline = socketChannel.pipeline();
		
		// 向 pipeline 中添加处理器 Handler
		// 1.处理协议的 Handler
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
        pipeline.addLast(new LengthFieldPrepender(4));
        // 2.处理编解码的 Handler
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        // 3.自定义 Handler
        pipeline.addLast(new NettyServerHandler());
    }
}

那会不会有同学有问题了,我先设置一个 StringDecoder 去解码再设置一个 StringEncoder 去编码,那这就跟一正一负一样,不抵消喽?别急,向下看。

关于 ChannelPipeline 的分析可以看这篇文章

1.ChannelHandler 是什么

在上面代码中我们给 pipeline 中添加的其实都是 ChannelHandler,按照输出输出来分,分为 ChannelInboundHandler、ChannelOutboundHandler 两大类。

  • ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等
  • ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。

【Netty】原理分析:ChannelHandler

说的更专业一点,ChannelHandler 实质上是充当了处理入站出站数据的应用程序逻辑容器。以客户端应用程序为例,

  1. 入站:如果事件的运动方向是从服务端到客户端的,那么我们称这些事件为入站的
    1. 客户端发送给服务端的数据会通过 pipeline 中的一系列 ChannelInboundHandler
    2. 当有多个 ChannelInboundHandler 时,调用顺序是从 head -> tail 方向逐个调用,并依次被这些 Handler 处理
  2. 出站:如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的
    1. 客户端发送给服务端的数据会通过 pipeline 中的一系列 ChannelOutboundHandler
    2. 当有多个 ChannelOutboundHandler 时, 调用顺序是从 tail -> head 方向逐个调用,并依次被这些 Handler 处理

现在来看上面抛出的那个问题,StringDecoder 是 ChannelInboundHandler 的实现类,所以它只会对入站数据进行处理,即只会处理从外面读进来的数据;而 StringEncoder 是 ChannelOutboundHandler 的实现类,所以它只会处理出站的数据,即只会处理要发送出去的数据。因此,就根本没有冲突这一说。。。

2.ChannelHandler 有什么方法

ChannelHandler 接口里面只定义了三个生命周期方法,

【Netty】原理分析:ChannelHandler

// 当前 ChannelHander 加入ChannelHandlerContext 中时触发回调
void handlerAdded(ChannelHandlerContext ctx) throws Exception;

// 当前 ChannelHandler 从ChannelHandlerContext 中移除时触发回调
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

// ChannelHandler 回调方法出现异常时被回调
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

但是,我们在使用时主要实现它的子接口 ChannelInboundHandler 和 ChannelOutboundHandler。

ChannelInboundHandler

ChannelInboundHandler 接口中各方法及回调时机:

// 当前channel注册到EventLoop时
void channelRegistered(ChannelHandlerContext var1) throws Exception;
// 当前channel从EventLoop取消注册时
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
// 当前channel激活时
void channelActive(ChannelHandlerContext var1) throws Exception;
// 当前channel失活时
void channelInactive(ChannelHandlerContext var1) throws Exception;
// 当前channel从远端读取到数据
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
// read消费完读取的数据时
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
// 用户事件触发时
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
// channel的写状态变化的时
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
// 出现异常时
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;

其中,ChannelHandlerContext 作为参数,在每个回调事件处理完成之后,使用 ChannelHandlerContext 的 fireChannelXXX 方法来传递给 pipeline 中下一个ChannelHandler,netty 的 codec 模块和业务处理代码分离就用到了这个链路处理。

关于 ChannelHandlerContext 可以参考这篇文章

ChannelOutboundHandler

ChannelInboundHandler 接口中各方法及回调时机:

// bind操作执行前触发;
void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;

// connect 操作执行前触发
void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;

// disconnect 操作执行前触发
void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

// close操作执行前触发
void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

// deregister操作执行前触发
void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

// read操作执行前触发
void read(ChannelHandlerContext var1) throws Exception;

// write操作执行前触发
void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;

// flush操作执行前触发
void flush(ChannelHandlerContext var1) throws Exception;

可以注意到一些回调方法有 ChannelPromise 这个参数,我们可以调用它的 addListener 注册监听,当回调方法所对应的操作完成后,会触发这个监听下面这个代码,会在写操作完成后触发,完成操作包括成功和失败

另外,为了便利,Netty 框架提供了 ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter 和 ChannelDuplexHandler 这三个适配类提供一些默认实现,在使用的时候只需要实现你关注的方法即可。

Channel 生命周期:Sharable -> handlerAdded() -> channelRegistered() -> channelActive() -> channelRead() -> channelReadComplete() -> channelInactive() -> channelUnregistered -> handlerAddedRemoved()

3.如何自定义 ChannelHandler

下面来看一个自定义 ChannelHandler 的例子。例如,实现 ChannelInboundHandler 接口(或 ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据随后会被你的应用程序的业务逻辑处理。当你要给连接的客户端发送响应时,也可以从 ChannelInboundHandler 冲刷数据。

public class ServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取客户端发送的数据
     *
     * @param ctx 上下文对象, 含有通道channel,管道pipeline
     * @param msg 就是客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器读取线程 " + Thread.currentThread().getName());
        
        //Channel channel = ctx.channel();
        //ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
        
        // 将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 数据读取完毕处理方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }

    /**
     * 处理异常, 一般是需要关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

但在实际使用中,我们最常用的方式是解码器 + SimpleChannelInboundHandler 处理指定类型消息。就像最上面我们在 NettyServerInitailizer 配置的那个 NettyServerHandler

/**
* SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter 的子类
* 注:泛型指定了处理的消息的类型,是经过前面解码器解析过的
*/
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
	
	/**
     * 有新的客户端连接时触发
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("新的客户端..." + ctx.channel().remoteAddress());
    }
	
	/**
     * 读取客户端发送的数据
     * 注:此时消息已经是 String 类型了
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    	// 注:因为已经引入了解码器,所以这里直接就是可以读写 String 类型消息,不用再先包装成 ByteBuf
        System.out.println(ctx.channel().remoteAddress() +":" + msg);
        ctx.channel().writeAndFlush("Hello Client --"+ UUID.randomUUID().toString().substring(0,7));
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

通常,业务逻辑会写在一个或者多个 ChannelInboundHandler中。 ChannelOutboundHandler 原理一样,只不过它是用来处理出站数据的。

PS:其实我们写 Netty 的代码就主要是在写这些 ChannelHandler。。。

本文地址:https://blog.csdn.net/weixin_43935927/article/details/111996227