欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

netty中使用protobuf实现多协议的消息

程序员文章站 2022-05-06 20:08:27
...

    在我们使用 netty 的过程中,有时候为了高效传输数据,经常使用 protobuf 进行数据的传输,netty默认情况下为我们实现的 protobuf 的编解码,但是默认的只能实现单个对象编解码,但是我们在使用 netty 的过程中,可能需要传输的对象有各种各样的,那么该如何实现对protobuf多协议的解码呢

 

    在 protobuf 中有一种类型的字段叫做  oneof , 被 oneof 声明的字段就类似于可选字段,在同一时刻只有一个字段有值,并且它们会共享内存。

 

有了上述基础知识,我们来实现一个简单的功能。

 

需求:

       客户端在连接上服务器端后,每隔 1s 向服务器端发送一个 protobuf 类型的对象(比如登录报文、创建任务报文、删除任务报文等等),服务器端接收到这个对象并打印出来。

 

protobuf文件的编写:

       在protobuf 文件中,我们申明一个 枚举类型的字段,用来标识当前发送的 protobuf 对象的类型,比如是登录报文、创建任务报文还是别的,然后在  oneof 字段中,申明所有可能需要传递的 报文实体

 

一、protobuf-java jar包的引入

<dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>3.6.1</version>
</dependency>

 

二、proto 文件的编写

netty中使用protobuf实现多协议的消息
            
    
    博客分类: nettyprotostuff nettyprotobufnetty protobuf 多协议消息protobuf 多协议消息 
   注意:

            1、定义的枚举是为了标识当前发送的是什么类型的消息

            2、需要发送的多个消息统一放入到 oneof 中进行申明

            3、到时候给 netty 编解码的时候就编解码 TaskProtocol 对象

 

三、使用 protoc 命令根据 .proto 文件生成 对应的 java 代码

四、netty服务器端的编写

/**
 * netty protobuf server
 *
 * @author huan.fu
 * @date 2019/2/15 - 11:54
 */
@Slf4j
public class NettyProtobufServer {

	public static void main(String[] args) throws InterruptedException {
		EventLoopGroup parentGroup = new NioEventLoopGroup(1);
		EventLoopGroup childGroup = new NioEventLoopGroup();
		ServerBootstrap bootstrap = new ServerBootstrap();
		bootstrap.group(parentGroup, childGroup)
				.channel(NioServerSocketChannel.class)
				.option(ChannelOption.SO_BACKLOG, 1024)
				// 连接超时
				.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000)
				.handler(new LoggingHandler(LogLevel.TRACE))
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) {
						ch.pipeline()
								.addLast(new ProtobufVarint32FrameDecoder())
								.addLast(new ProtobufDecoder(TaskProtobufWrapper.TaskProtocol.getDefaultInstance()))
								.addLast(new ProtobufVarint32LengthFieldPrepender())
								.addLast(new ProtobufEncoder())
								.addLast(new ServerProtobufHandler());
					}
				});
		// 绑定端口,同步等待成功
		ChannelFuture future = bootstrap.bind(9090).sync();
		log.info("server start in port:[{}]", 9090);
		// 等待服务端链路关闭后,main线程退出
		future.channel().closeFuture().sync();
		// 关闭线程池资源
		parentGroup.shutdownGracefully();
		childGroup.shutdownGracefully();
	}
}

    注意:

            1、注意一下 netty 是如何使用那些编解码器来编解码 protobuf 的。

 

五、服务器端接收到客户端发送过来的消息的处理

/**
 * 服务器端接收到客户端发送的请求,然后随机给客户端返回一个对象
 *
 * @author huan.fu
 * @date 2019/2/15 - 14:26
 */
@Slf4j
public class ServerProtobufHandler extends SimpleChannelInboundHandler<TaskProtobufWrapper.TaskProtocol> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, TaskProtobufWrapper.TaskProtocol taskProtocol) {
		switch (taskProtocol.getPackType()) {
			case LOGIN:
				log.info("接收到一个登录类型的pack:[{}]", taskProtocol.getLoginPack().getUsername() + " : " + taskProtocol.getLoginPack().getPassword());
				break;
			case CREATE_TASK:
				log.info("接收到一个创建任务类型的pack:[{}]", taskProtocol.getCreateTaskPack().getTaskId() + " : " + taskProtocol.getCreateTaskPack().getTaskName());
				break;
			case DELETE_TASK:
				log.info("接收到一个删除任务类型的pack:[{}]", Arrays.toString(taskProtocol.getDeleteTaskPack().getTaskIdList().toArray()));
				break;
			default:
				log.error("接收到一个未知类型的pack:[{}]", taskProtocol.getPackType());
				break;
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		ctx.close();
		log.error("发生异常", cause);
	}
}

    注意:

            1、服务器端根据 packType 字段来判断客户端发送的是什么类型的消息

 

六、netty 客户端的编写

/**
 * netty protobuf client
 *
 * @author huan.fu
 * @date 2019/2/15 - 11:54
 */
@Slf4j
public class NettyProtobufClient {

	public static void main(String[] args) throws InterruptedException {
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap bootstrap = new Bootstrap();
		bootstrap.group(group)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) {
						ch.pipeline()
								.addLast(new ProtobufVarint32FrameDecoder())
								.addLast(new ProtobufDecoder(TaskProtobufWrapper.TaskProtocol.getDefaultInstance()))
								.addLast(new ProtobufVarint32LengthFieldPrepender())
								.addLast(new ProtobufEncoder())
								.addLast(new ClientProtobufHandler());
					}
				});
		ChannelFuture future = bootstrap.connect("127.0.0.1", 9090).sync();
		log.info("client connect server.");
		future.channel().closeFuture().sync();
		group.shutdownGracefully();
	}
}

 

七、客户端连接到服务器端时的处理

/**
 * 客户端连接到服务器端后,每隔1s发送一个报文到服务器端
 *
 * @author huan.fu
 * @date 2019/2/15 - 14:26
 */
@Slf4j
public class ClientProtobufHandler extends ChannelInboundHandlerAdapter {

	private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

	private AtomicInteger atomicInteger = new AtomicInteger(1);

	@Override
	public void channelActive(ChannelHandlerContext ctx) {
		executor.scheduleAtFixedRate(() -> {
			// 产生的pack类型
			int packType = new Random().nextInt(3);
			switch (TaskProtobufWrapper.PackType.forNumber(packType)) {
				case LOGIN:
					TaskProtobufWrapper.LoginPack loginPack = TaskProtobufWrapper.LoginPack.newBuilder().setUsername("张三[" + atomicInteger.getAndIncrement() + "]").setPassword("123456").build();
					ctx.writeAndFlush(TaskProtobufWrapper.TaskProtocol.newBuilder().setPackType(TaskProtobufWrapper.PackType.LOGIN).setLoginPack(loginPack).build());
					break;
				case CREATE_TASK:
					TaskProtobufWrapper.CreateTaskPack createTaskPack = TaskProtobufWrapper.CreateTaskPack.newBuilder().setCreateTime(System.currentTimeMillis()).setTaskId("100" + atomicInteger.get()).setTaskName("任务编号" + atomicInteger.get()).build();
					ctx.writeAndFlush(TaskProtobufWrapper.TaskProtocol.newBuilder().setPackType(TaskProtobufWrapper.PackType.CREATE_TASK).setCreateTaskPack(createTaskPack).build());
					break;
				case DELETE_TASK:
					TaskProtobufWrapper.DeleteTaskPack deleteTaskPack = TaskProtobufWrapper.DeleteTaskPack.newBuilder().addTaskId("1001").addTaskId("1002").build();
					ctx.writeAndFlush(TaskProtobufWrapper.TaskProtocol.newBuilder().setPackType(TaskProtobufWrapper.PackType.DELETE_TASK).setDeleteTaskPack(deleteTaskPack).build());
					break;
				default:
					log.error("产生一个未知的包类型:[{}]", packType);
					break;
			}
		}, 0, 1, TimeUnit.SECONDS);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		ctx.close();
		log.error("发生异常", cause);
	}
}

    注意:

           1、客户端在连接服务器端时每隔1s发送不同的消息到服务器端

 

八、运行结果

netty中使用protobuf实现多协议的消息
            
    
    博客分类: nettyprotostuff nettyprotobufnetty protobuf 多协议消息protobuf 多协议消息 


 九、完整代码

完成代码如下https://gitee.com/huan1993/netty-study/tree/master/src/main/java/com/huan/netty/protobuf

 

 

  • netty中使用protobuf实现多协议的消息
            
    
    博客分类: nettyprotostuff nettyprotobufnetty protobuf 多协议消息protobuf 多协议消息 
  • 大小: 49.5 KB
  • netty中使用protobuf实现多协议的消息
            
    
    博客分类: nettyprotostuff nettyprotobufnetty protobuf 多协议消息protobuf 多协议消息 
  • 大小: 84.6 KB