欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty集成ProtoBuf开发私有协议

程序员文章站 2022-05-05 13:45:42
...

Netty集成ProtoBuf开发私有协议

私有协议

广义上区分,通信协议可以分为公有协议和私有协议。由于私有协议的灵活性,它往往会在某个公司或者组织内部使用,按需定制,也因为如此,升级起来会非常方便,灵活性好。绝大多数的私有协议传输层都基于TCP/IP,所以利用Netty的NIO TCP协议栈可以非常方便地进行私有协议的定制和开发。

通信模型

Netty集成ProtoBuf开发私有协议

(1) Netty协议栈客户端发送握手请求消息,携带节点ID等有效身份认证信息;
(2) Netty 协议栈服务端对握手请求消息进行合法性校验,包括节点ID有效性校验、节点重复登录校验和IP地址合法性校验,校验通过后,返回登录成功的握手应答消息:
(3)链路建立成功之后,客户端发送业务消息;
(4)链路成功之后,服务端发送心跳消息;
(5)链路建立成功之后,客户端发送心跳消息;
(6)链路建立成功之后,服务端发送业务消息;
(7)服务端退出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接。

ProtoBuf数据格式

syntax = "proto3";
option java_package = "com.fy.protobuf";
option java_outer_classname="CustomMessageData";

message MessageData{
    int64 length = 1;
    Content content = 2;
    enum DataType {
        REQ_LOGIN = 0;  //上线登录验证环节 等基础信息上报
        RSP_LOGIN = 1;  //返回上线登录状态与基础信息
        PING = 2;  //心跳
        PONG = 3;  //心跳
        REQ_ACT = 4;  //动作请求
        RSP_ACT = 5;  //动作响应
        REQ_CMD = 6;  //指令请求
        RSP_CMD = 7;  //指令响应
        REQ_LOG = 8 ;//日志请求
        RSP_LOG = 9;  //日志响应
    }
    DataType order = 3;
    message Content{
        int64 contentLength = 1;
        string data = 2;
    }
}

开发步骤

tip????下列步骤有点吃力的小伙伴可以看看之前的文章:https://blog.csdn.net/kunfeisang5551/article/details/107957256

1、在D盘protobuf路径下执行命令:protoc.exe --java_out=D:\protobuf CustomMsg.proto

2、将生成的文件拷贝到项目中

开始Coding~

1、新建maven项目,引入依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.51.Final</version>
</dependency>
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.11.0</version>
</dependency>

2、创建服务端启动代码

public class CustomServer {
    public void bind(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap
                    .group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    //消息头定长
                                    .addLast(new ProtobufVarint32FrameDecoder())
                                    //解码指定的消息类型
                                    .addLast(new ProtobufDecoder(CustomMessageData.MessageData.getDefaultInstance()))
                                    //消息头设置长度
                                    .addLast(new ProtobufVarint32LengthFieldPrepender())
                                    //解码
                                    .addLast(new ProtobufEncoder())
                                    //心跳检测,超过设置的时间将会抛出异常ReadTimeoutException
                                    .addLast(new ReadTimeoutHandler(8))
                                    //消息处理
                                    .addLast(new CustomServerHandler())
                                    //心跳响应
                                    .addLast(new CustomServerHeartBeatHandler());
                        }
                    });
            // 绑定端口同步等待启动成功
            ChannelFuture sync = bootstrap.bind(port).sync();

            // 等待服务监听端口关闭
            sync.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

3、创建服务端消息处理代码

public class CustomServerHandler extends ChannelInboundHandlerAdapter {

    private String[] whiteIPv4List = {"127.0.0.1", "192.168.1.188"};
    public static ConcurrentHashMap nodeCheck = new ConcurrentHashMap();

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CustomMessageData.MessageData messageData = (CustomMessageData.MessageData) msg;
        if (messageData.getOrder() == CustomMessageData.MessageData.DataType.UNRECOGNIZED) {
            // 无法识别的消息类型
            ctx.close();
        }

        if (messageData.getOrder() == CustomMessageData.MessageData.DataType.REQ_LOGIN) {
            // 检查重复登录
            String nodeIndex = ctx.channel().remoteAddress().toString();
            if (nodeCheck.contains(nodeIndex)) {
                // 重复登录
                ctx.writeAndFlush(builderResp(false));
                return;
            } else {
                InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
                String ip = socketAddress.getAddress().getHostAddress();
                boolean isOk = false;
                // 检查白名单
                for (String s : whiteIPv4List) {
                    if (s.equals(ip)) {
                        isOk = true;
                        break;
                    }
                }
                // 成功响应
                CustomMessageData.MessageData responseData = isOk ? builderResp(true) : builderResp(false);
                if (isOk) {
                    nodeCheck.put(nodeIndex, true);
                }
                ctx.writeAndFlush(responseData);
            }
        } else {
            //心跳消息处理
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        nodeCheck.remove(ctx.channel().remoteAddress().toString());
        if (ctx.channel().isActive()) {
            ctx.close();
        }
    }

    public CustomMessageData.MessageData builderResp(boolean isOk) {
        String r = isOk ? "SUCCESS" : "FAILED";
        CustomMessageData.MessageData.Content responseContent = CustomMessageData.MessageData.Content.newBuilder().setData(r).setContentLength(r.length()).build();
        CustomMessageData.MessageData responseData = CustomMessageData.MessageData.newBuilder().setOrder(CustomMessageData.MessageData.DataType.RSP_LOGIN).setContent(responseContent).build();
        return responseData;
    }
}

4、创建服务端心跳响应代码

public class CustomServerHeartBeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CustomMessageData.MessageData messageData = (CustomMessageData.MessageData) msg;
        if (messageData.getOrder() == CustomMessageData.MessageData.DataType.PING) {
            CustomMessageData.MessageData req = CustomMessageData.MessageData.newBuilder()
                    .setOrder(CustomMessageData.MessageData.DataType.PONG).build();
            System.out.println("Send-Client:PONG,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            ctx.writeAndFlush(req);
        } else {
            ctx.fireChannelRead(msg);
        }
    }
}

5、创建客户端启动代码

public class CustomClient {
    public void bind(int port) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(new ProtobufVarint32FrameDecoder())
                                    .addLast(new ProtobufDecoder(CustomMessageData.MessageData.getDefaultInstance()))
                                    .addLast(new ProtobufVarint32LengthFieldPrepender())
                                    .addLast(new ProtobufEncoder())
                                    // 消息处理
                                    .addLast(new CustomClientHandler())
                                    // 心跳响应
                                    .addLast(new CustomClientHeartBeatHandler());
                        }
                    });
            ChannelFuture f = b.connect("127.0.0.1", port).sync();

            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 短线重连 定时5秒
            group.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(5);
                    bind(port);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
//            group.shutdownGracefully();
        }
    }
}

6、创建客户端消息处理代码

这里的逻辑主要是通道**后马上发送业务消息,然后保持心跳

public class CustomClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        CustomMessageData.MessageData reqData = CustomMessageData
                .MessageData
                .newBuilder()
                .setOrder(CustomMessageData.MessageData.DataType.REQ_LOGIN)
                .build();
        ctx.channel().writeAndFlush(reqData);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CustomMessageData.MessageData respData = (CustomMessageData.MessageData) msg;
        if (respData.getOrder() == CustomMessageData.MessageData.DataType.RSP_LOGIN) {
            // 响应登录请求处理逻辑
            boolean equals = respData.getContent().getData().equals("SUCCESS");
            if (equals) {
                System.out.println("Receive-Server:LoginSuccess,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                System.out.println(respData.toString());
                // 传递下一个handler
                ctx.fireChannelRead(msg);
            } else {
                // 登录失败
                if (ctx.channel().isActive()) {
                    ctx.close();
                }
            }
        } else {
            // 响应心跳处理逻辑
            ctx.fireChannelRead(msg);
        }

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        if (ctx.channel().isActive()) {
            ctx.close();
        }
    }
}

7、创建客户端心跳保持代码

public class CustomClientHeartBeatHandler extends ChannelInboundHandlerAdapter {

    private static ScheduledFuture heartbeatFuture;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        CustomMessageData.MessageData messageData = (CustomMessageData.MessageData) msg;
        if (messageData.getOrder() == CustomMessageData.MessageData.DataType.RSP_LOGIN) {
            // 登录成功后保持心跳 间隔为5秒
            heartbeatFuture = ctx.executor().scheduleAtFixedRate(() -> {
                CustomMessageData.MessageData req = CustomMessageData.MessageData.newBuilder()
                        .setOrder(CustomMessageData.MessageData.DataType.PING).build();
                System.out.println("Send-Server:PING,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                ctx.writeAndFlush(req);
            }, 0, 5, TimeUnit.SECONDS);
        } else if (messageData.getOrder() == CustomMessageData.MessageData.DataType.PONG) {
            System.out.println("Receive-Server:PONG,time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            System.out.println();
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 发生异常就取消心跳保持
        if (heartbeatFuture != null) {
            heartbeatFuture.cancel(true);
            heartbeatFuture = null;
        }
        ctx.fireExceptionCaught(cause);
    }
}

8、启动服务端

public class Server {
    public static void main(String[] args) throws Exception {
        new CustomServer().bind(8080);
    }
}

9、启动客户端

public class Client {
    public static void main(String[] args) {
        new CustomClient().bind(8080);
    }
}

控制台打印

1、客户端

Netty集成ProtoBuf开发私有协议
Receive-Server:LoginSuccess,time:2020-08-12 17:31:47
content {
  contentLength: 7
  data: "SUCCESS"
}
order: RSP_LOGIN

Send-Server:PING,time:2020-08-12 17:31:47
Receive-Server:PONG,time:2020-08-12 17:31:47

Send-Server:PING,time:2020-08-12 17:31:52
Receive-Server:PONG,time:2020-08-12 17:31:52

Send-Server:PING,time:2020-08-12 17:31:57
Receive-Server:PONG,time:2020-08-12 17:31:57

Send-Server:PING,time:2020-08-12 17:32:02
Receive-Server:PONG,time:2020-08-12 17:32:02

我们可以看到,当客户端发送登录请求后,服务端响应登录成功消息,然后交替打印心跳保持信息,间隔为5秒。

2、服务端

Netty集成ProtoBuf开发私有协议
Send-Client:PONG,time:2020-08-12 17:31:47
Send-Client:PONG,time:2020-08-12 17:31:52
Send-Client:PONG,time:2020-08-12 17:31:57
Send-Client:PONG,time:2020-08-12 17:32:02
Send-Client:PONG,time:2020-08-12 17:32:07

服务端响应登录请求后交替打印心跳保持信息。

3、测试服务端异常

我们先停掉服务端,看看客户端有啥反应,客户端日志:

Connection refused: no further information

客户端5秒打印一次异常信息,说明短线重连逻辑正常

我们接着再启动服务端,看看客户端有啥反应

Netty集成ProtoBuf开发私有协议

io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080
Caused by: java.net.ConnectException: Connection refused: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
Receive-Server:LoginSuccess,time:2020-08-12 17:44:15
content {
  contentLength: 7
  data: "SUCCESS"
}
order: RSP_LOGIN

Send-Server:PING,time:2020-08-12 17:44:15
Receive-Server:PONG,time:2020-08-12 17:44:15

Send-Server:PING,time:2020-08-12 17:44:20
Receive-Server:PONG,time:2020-08-12 17:44:20

可以看到由异常转为正常啦~

通过测试可以验证是否符合私有协议的约定:

(1)客户端是否能够正常发起重连:
(2)重连成功之后,不再重连:
(3)断连期间,心跳定时器停止工作,不再发送心跳请求消息;
(4)服务端重启成功之后,允许客户端重新登录;
(5)服务端重启成功之后,客户端能够重连和握手成功:
(6)重连成功之后,双方的心跳能够正常互发。
(7)性能指标:重连期间,客户端资源得到了正常回收,不会导致句柄等资源泄漏。

GitHub服务端地址:https://github.com/GoodBoy2333/netty-server-maven.git

0-08-12 17:44:15
Receive-Server:PONG,time:2020-08-12 17:44:15

Send-Server:PING,time:2020-08-12 17:44:20
Receive-Server:PONG,time:2020-08-12 17:44:20


可以看到由异常转为正常啦~

通过测试可以验证是否符合私有协议的约定:

(1)客户端是否能够正常发起重连:
(2)重连成功之后,不再重连:
(3)断连期间,心跳定时器停止工作,不再发送心跳请求消息;
(4)服务端重启成功之后,允许客户端重新登录;
(5)服务端重启成功之后,客户端能够重连和握手成功:
(6)重连成功之后,双方的心跳能够正常互发。
(7)性能指标:重连期间,客户端资源得到了正常回收,不会导致句柄等资源泄漏。



GitHub服务端地址:https://github.com/GoodBoy2333/netty-server-maven.git

GitHub客户端地址:https://github.com/GoodBoy2333/netty-client-maven.git