Netty入门 -- 编解码器
编解码器
简介
计算机底层对数据的处理都是二进制的,所以即使在在网络通信交互的过程中,对传递的信息都是二进制的数据;所以在客户端发送信息的时候,需要将信息解析成二进制文件,发送给服务端的时候,也需要将二进制文件转换为对应的信息来读取使用;
在发送时将数据转换为二进制称之为编码;
在接受时候,将数据转换为对应的信息称之为解码;
所以在网络通信过程中我们需要使用codcc(编解码器)
codec(编解码器) 的组成部分有两个:decoder(解码器)和 encoder(编码器)。encoder 负责把业务数据转换成字节 码数据,decoder 负责把字节码数据转换成业务数据
Netty 编解码器
说明
-
netty 的组件设计:Netty 的主要组件有 Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe 等
-
ChannelHandler 充当了处理入站和出站数据的应用程序逻辑的容器。例如,实现 ChannelInboundHandler 接口(或 ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端 发 送 响 应 时 , 也 可 以 从 ChannelInboundHandler 冲 刷 数 据 。 业 务 逻 辑 通 常 写 在 一 个 或 者 多 个 ChannelInboundHandler 中。ChannelOutboundHandler 原理一样,只不过它是用来处理出站数据的
-
ChannelPipeline 提供了 ChannelHandler 链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到 服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过 pipeline 中的一系列 ChannelOutboundHandler,并被这些 Handler 处理,反之则称为入站的
解码器(Decoder)
decoder 负责将“入站”数据从一种格式转换到另一种格式;
Netty的decoder实现了入站处理器接口ChannelInboundHandler;
decoder分为两类:
- 解码字节到消息 -> ByteToMessageDecoder 和 ReplayingDecoder(将最原始的字节码转换为可以读取的文件)
- 解码消息到消息 -> MessageToMessageDecode(将字节码文件解码后,再原有基础上根据某种对应关系再次解析)
ByteToMessageDecoder
该抽象类有两个方法:
因为数据传到服务端有可能不是一次请求就能完成的,中间可能需要经过几次数据传输,并且每一次传输传多少数据也是不确定;
可以重写decodeLast方法来标识一条完整的消息结束,比如用来产生一个 LastHttpContent 消息,表示一条http消息已经被解码完成。
需要注意的是,原子类型的 int 在被添加到 List 中时,会被自动装箱为 Integer;
例如:将4个字节的数据解码成Integer并在控制台输出
每次从入站的 ByteBuf 读取四个字节,解码成Integer,并添加到一个 List
当不能再添加数据到 list 时,它所包含的内容就会被发送到下个 ChannelInboundHandler
解码器:
public class NettyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf.readableBytes() >= 4){
list.add(byteBuf.readInt()); //将4个字节转为整型
}
}
}
服务端
public class NettyDecoderServer {
public static void main(String[] args) {
//创建两个线程组一个负责连接 一个负责处理业务
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
//组装参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyDecoder());
pipeline.addLast(new SimpleChannelInboundHandler<Integer>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Integer msg) throws Exception {
System.out.println("解码结果" + msg);
}
});
}
});
ChannelFuture channelFuture = bootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
ReplayingDecoder
ReplayingDecoder 继承自 ByteToMessageDecoder;
ByteToMessageDecoder在解码前需要读取数据之前需要检查缓冲区 ByteBuf是否有足够的字节,很麻烦;
ReplayingDecoder无需自己检查;若ByteBuf中有足够的字节,则会正常读取;若没有足够的字节则会停止解码。
类声明:
public abstract class ReplayingDecoder extends ByteToMessageDecoder
类型参数 S 指定了用于状态管理的类型,其中 Void 代表不需要状态管理;
public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
list.add(byteBuf.readerIndex());
}
}
使用ReplayingDecoder实现将数据转Integer:
和ByteToMessageDecoder类似,只不过不用检查字节是否足够了;
MessageToMessageDecode
用于从一种消息解码为另外一种消息(例如,POJO 到 POJO)
public abstract class MessageToMessageDecoderextends ChannelInboundHandlerAdapter
类型参数 I 指定了 decode()方法的输入参数 msg 的类型;
public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, Integer msg, List<Object> list) throws Exception {
list.add(String.valueOf(msg));
}
}
netty提供的消息转换器HttpObjectAggregator就继承自MessageToMessageDecoder;
HttpObjectAggregator是http消息聚合器,用来将解码后产生的http消息整合成一条完整的http请求;
小结
-
不论解码器 handler 还是 编码器 handler 即接收的消息类型必须与待处理的消息类型一致,否则该 handler 不 会被执行
-
在解码器 进行数据解码时,需要判断 缓存区(ByteBuf)的数据是否足够 ,否则接收到的结果会期望结果可能 不一致
-
通过下图的继承关系,我们可以发现最低成的是 ByteToMessageDecoder,这种继承关系就会让我们在管道ChannelPipeline 添加继承实现的解码器后,在消息传递过程完成解码后,其解析成果的msg可以直接被handler拿到使用
-
解码时处理太大的帧
由于 Netty 是一个异步框架,所以需要在字节可以解码之前在内存中缓冲它们;
如果对缓冲区不加限制可能造成内存不足的后果;
为了解决这一问题 Netty 提供了一个TooLongFrameException
可以在解码器里设置一个最大字节数阈值,如果超出,就抛出TooLongFrameException
ChannelHandler.exceptionCaught()会捕获到该异常,然后决定如何处理;
public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
private static final int MAX_FRAME_SIZE = 1024; //定义阈值为1k
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
int readable = in.readableBytes();
if (readable > MAX_FRAME_SIZE) { //如果缓冲区的可读字节大于1k则跳过并抛出帧数过大异常
in.skipBytes(readable);
throw new TooLongFrameException("Frame too big!");
}
// do something
}
}
编码器(Encoder)
encoder实现了 ChannelOutboundHandler接口;
用来把出站数据从一种格式转换到另外一种格式;
编码器包括两类:
- 编码从消息到字节 -> MessageToByteEncoder
- 编码从消息到消息 -> MessageToMessageEncoder
MessageToByteEncoder
只有一个需要实现的抽象方法;
解码器经常需要在 Channel 关闭时产生一个“最后的消息"而提供了decodeLast();
而对于编码器没有这个需求,在连接被关闭之后仍然产生一个消息是毫无意义的。;
例如:将short类型的出站消息编码成byte类型
public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Short msg, ByteBuf byteBuf) throws Exception {
byteBuf.writeShort(msg);
}
}
其接受一个 Short 类型的实例作为消息;
将它编码为 Short 的原子类型值,并将它写入 ByteBuf 中;
其将随后被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler;
每个传出的 Short 值都将会占用 ByteBuf 中的 2 字节;
MessageToMessageEncoder
public class IntegerToStringEncoder extends
MessageToMessageEncoder<Integer> { //
@Override
public void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out)
throws Exception {
out.add(String.valueOf(msg)); //
}
}
编解码器(Codec)
Codec同时实现了ChannelInboundHandler 和 ChannelOutboundHandler接口;
相当于将编码器和解码器结合在一个类中;
分类:
ByteToMessageCodec
MessageToMessageCodec
ByteToMessageCodec
结合了ByteToMessageDecoder 以及它的逆向——MessageToByteEncoder;
使用场景:
我们需要将字节解码为某种形式的消息,可能是 POJO,随后再次对它进行编码为字节 。
MessageToMessageCodec
类签名:
public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>
api:
decode()方法是将 INBOUND_IN类型的消 息转换 为 OUTBOUND_IN类型的 消息;
而encode()方法则进行它的逆向操作;
CombinedChannelDuplexHandler
解码器和编码器结合在一起可能会牺牲可重用性;
使用CombinedChannelDuplexHandler既能够避免这种惩罚, 又不会牺牲将一个解码器和一个编码器作为一个单独的单元部署所带来的便利性。
public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
相当于是一个编码器和解码器的容器;
编码和解码的操作交给其中装配的编码器和解码器实例来执行
public class CombinedByteCharCodec extends
CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
public CombinedByteCharCodec() {
super(new ByteToCharDecoder(), new CharToByteEncoder());
}
}
上一篇: ABB机器人的Socket通信测试
下一篇: ROS - 在ROS中使用YOLO