Encode、Decode
程序员文章站
2022-07-14 19:20:52
...
import java.io.Serializable;
public class Dog implements Serializable {
// 名字
private String name;
// 品种
private String type;
// 颜色
private String color;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getColor() {
return color;
}
public void setColor(String color) {
this.color = color;
}
}
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MsgEncoder extends MessageToByteEncoder<Dog> {
@Override
protected void encode(ChannelHandlerContext ctx, Dog msg, ByteBuf out) throws Exception {
//将对象转换为byte
byte[] body = convertToBytes(msg);
System.out.println("Dog convert to bytes");
if (body == null || body.length < 1) {
return;
}
//读取消息的长度
int dataLength = body.length;
//先将消息长度写入,也就是消息头
out.writeInt(dataLength);
//消息体中包含我们要发送的数据
out.writeBytes(body);
}
private byte[] convertToBytes(Dog dog) {
if (dog == null) {
return null;
}
try {
// 对象装好为 json 格式字符串
String contentString = JSON.toJSONString(dog);
// 字符串转换为 byte 数组
return contentString.getBytes("GBK");
} catch (Exception e) {
System.out.println("Dog convert to bytes error. ");
e.printStackTrace();
}
return null;
}
}
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class MsgDecoder extends ByteToMessageDecoder {
private static final int HEAD_LENGTH = 4;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//这个HEAD_LENGTH是我们用于表示头长度的字节数。 由于Encoder中我们传的是一个int类型的值,所以这里HEAD_LENGTH的值为4.
if (in.readableBytes() < HEAD_LENGTH) {
return;
}
//我们标记一下当前的readIndex的位置
in.markReaderIndex();
// 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4
int dataLength = in.readInt();
// 我们读到的消息体长度为0,这是不应该出现的情况,这里出现这情况,关闭连接。
if (dataLength < 0) {
ctx.close();
}
//读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
//传输正常
byte[] body = new byte[dataLength];
in.readBytes(body);
//将byte数据转化为我们需要的对象
Object o = convertToObject(body);
System.out.println("bytes convert to Dog");
out.add(o);
}
private Object convertToObject(byte[] body) {
if (body == null || body.length < 1) {
return null;
}
String contentString = null;
try {
contentString = new String(body, "GBK");
} catch (Exception e) {
}
if (contentString == null || contentString.length() < 1) {
return null;
}
try {
Dog dog = JSON.parseObject(contentString, Dog.class);
return dog;
} catch (Exception e) {
System.out.println("bytes convert to Dog error. ");
e.printStackTrace();
}
return contentString;
}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
@Component
public class ObjectServer {
/**
* 创建bootstrap
*/
private static final ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* Worker
*/
private static final EventLoopGroup work = new NioEventLoopGroup();
/**
* 端口号
*/
private int port = getPort();
/**
* 关闭服务器方法
*/
public void close() {
System.out.println("关闭服务器....");
//优雅退出
work.shutdownGracefully();
}
/**
* 开启及服务线程
*/
public void start() {
serverBootstrap.group(work)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO));
try {
//设置事件处理
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast( new MsgEncoder());
pipeline.addLast(new MsgDecoder());
pipeline.addLast(new ServerObjectHandler());
}
});
System.out.println("netty服务器在[" + port + "]端口启动监听");
ChannelFuture f = serverBootstrap.bind(port);
f.channel().closeFuture();
} catch (Exception e) {
System.out.println("[出现异常] 释放资源");
} finally {
close();
}
}
private static int getPort() {
Double port = (Math.random() + 1) * 10000;
return port.intValue();
}
public static void main(String[] args) {
ObjectServer nsl = new ObjectServer();
nsl.start();
public class ServerObjectHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("some error. ");
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 具体处理客户端的数据信息
Dog dog = (Dog) msg;
System.out.println(
"receive message. name=" + dog.getName() + ",type=" + dog.getType() + ",color=" + dog
.getColor()
);
// 返回给客户端一个新的对象
Dog returnDog = new Dog();
returnDog.setName("MaoMao");
returnDog.setType("JinMao");
returnDog.setColor("yellow");
ctx.writeAndFlush(returnDog);
}
}
上一篇: 获取文件编码格式
下一篇: 原型聚类之学习向量量化及Python实现