Netty之Hessian编解码
程序员文章站
2022-06-24 10:40:52
JDK序列化使⽤是⽐较⽅便,但是它的性能较差,序列化后的字节⼤⼩也⽐较⼤,所以⼀般在项⽬中不会使⽤⾃带的序列化,⽽是会采⽤第三⽅的序列化框架。我们以Hessian为例,演示下如何与Netty整合进⾏编解码处理。导⼊Hessian依赖:com.caucho hessian 4.0.63
JDK序列化使⽤是⽐较⽅便,但是它的性能较差,序列化后的字节⼤⼩也⽐较⼤,所以⼀般在项⽬中不会使⽤⾃带的序列化,⽽是会采⽤第三⽅的序列化框架。
我们以Hessian为例,演示下如何与Netty整合进⾏编解码处理。
导⼊Hessian依赖:
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.63</version>
</dependency>
User对象:
package cn.wangj.netty.codec.hessian;
public class User implements java.io.Serializable{
private static final long serialVersionUID = -8200798627910162221L;
private Long id;
private String name;
private Integer age;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
编解码器:
Hessian序列化⼯具类:
package cn.wangj.netty.codec.hessian.codec;
import com.caucho.hessian.io.HessianInput;
import com.caucho.hessian.io.HessianOutput;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* Hessian序列化⼯具类
*
*/
public class HessianSerializer {
public <T> byte[] serialize(T obj) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
HessianOutput ho = new HessianOutput(os);
try {
ho.writeObject(obj);
ho.flush();
return os.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
ho.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
os.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public <T> Object deserialize(byte[] bytes, Class<T> clazz) {
ByteArrayInputStream is = new ByteArrayInputStream(bytes);
HessianInput hi = new HessianInput(is);
try {
return (T) hi.readObject(clazz);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
hi.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
is.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
编码器:
package cn.wangj.netty.codec.hessian.codec;
import cn.wangj.netty.coder.hessian.User;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class HessianEncoder extends MessageToByteEncoder<User> {
private HessianSerializer hessianSerializer = new HessianSerializer();
protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out) throws Exception {
byte[] bytes = hessianSerializer.serialize(msg);
out.writeBytes(bytes);
}
}
解码器:
package cn.wangj.netty.codec.hessian.codec;
import cn.wangj.netty.coder.hessian.User;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class HessianDecoder extends ByteToMessageDecoder {
private HessianSerializer hessianSerializer = new HessianSerializer();
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out) throws Exception {
//复制⼀份ByteBuf数据,轻复制,⾮完全拷⻉
//避免出现异常: did not read anything but decoded a message
//Netty检测没有读取任何字节就会抛出该异常
ByteBuf in2 = in.retainedDuplicate();
byte[] dst;
if (in2.hasArray()) {//堆缓冲区模式
dst = in2.array();
} else {
dst = new byte[in2.readableBytes()];
in2.getBytes(in2.readerIndex(), dst);
}
in.skipBytes(in.readableBytes());//跳过所有的字节,表示已经读取过了
Object obj = hessianSerializer.deserialize(dst, User.class);//反序列化
out.add(obj);
}
}
服务端
package cn.wangj.netty.codec.hessian;
import cn.wangj.netty.coder.hessian.codec.HessianDecoder;
import cn.wangj.netty.coder.hessian.handler.ServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyHessianServer {
public static void main(String[] args) throws Exception {
// System.setProperty("io.netty.noUnsafe", "true");
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup(1);
// ⼯作线程,线程数默认是: cpu*2
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
//配置server通道
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new HessianDecoder())
.addLast(new ServerHandler());
}
}); //worker线程的处理器
// serverBootstrap.childOption(ChannelOption.ALLOCATOR,
UnpooledByteBufAllocator.DEFAULT);
ChannelFuture future = serverBootstrap.bind(6677).sync();
System.out.println("服务器启动完成。。。。。 ");
//等待服务端监听端⼝关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
package cn.wangj.netty.codec.hessian.handler;
import cn.wangj.netty.coder.hessian.User;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ServerHandler extends SimpleChannelInboundHandler<User> {
@Override
public void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
//获取到user对象
System.out.println(user);
ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
}
}
客户端
package cn.wangj.netty.codec.hessian;
import cn.wangj.netty.coder.hessian.codec.HessianEncoder;
import cn.wangj.netty.coder.hessian.handler.ClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class NettyHessianClient {
public static void main(String[] args) throws Exception{
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 服务器启动类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HessianEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 6677).sync();
future.channel().closeFuture().sync();
} finally {
worker.shutdownGracefully();
}
}
}
package cn.wangj.netty.codec.hessian.handler;
import cn.wangj.netty.coder.hessian.User;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("接收到服务端的消息: " +
msg.toString(CharsetUtil.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
User user = new User();
user.setId(1L);
user.setName("张三");
user.setAge(20);
ctx.writeAndFlush(user);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
本文地址:https://blog.csdn.net/wangmourena/article/details/112269289
上一篇: 浅谈CI脚本异常退出问题定位