欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

使用netty发送报文的坑

程序员文章站 2022-04-22 18:03:31
...

最近跟银行调试一个接口的时候,行方说明是TCP/IP socket同步短链接的方式,开始采用socket和niosocket都不行,最后采用了了netty形式发送,代码很简单就是创建一个ChannelHandlerAdapter.主要代码如下,

测试类:
public static void main(String[] args) {
SockerClient client = new SockerClient();
Channel connect = client.connect(“xxx.xxx.xxx.xx”, xxxx);
client.sendMessage(msg)
}

class SockerClient {
        private ClientHandler clientHandler = new ClientHandler();
        public Channel  connect(String host, int port) throws Exception {
            EventLoopGroup workerGroup = new NioEventLoopGroup();

            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(clientHandler);
                }
            });
           return b.connect(host, port).sync().channel();
        }

        public String sendMessage(String msg) throws Exception {
            ChannelPromise promise = clientHandler.sendMessage(msg);
            promise.await();
            return clientHandler.getData();
        }
    }
public static class ClientHandler extends ChannelInboundHandlerAdapter {
        private ChannelHandlerContext ctx;
        private ChannelPromise promise;
        private String data;

        //初始化ctx,用来后面发送报文
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {

            System.out.println("---------------------执行active的线程"+Thread.currentThread());
            super.channelActive(ctx);

            this.ctx = ctx;
        }

        //发送报文
        public ChannelPromise sendMessage(String message) throws Exception {
            System.out.println("---------------------执行sendMessage的线程"+Thread.currentThread());
            if (ctx == null) {
                throw new IllegalStateException();
            }
            ByteBuf encoded = ctx.alloc().buffer(4 * message.length());
            encoded.writeBytes(message.getBytes("GBK"));
            promise = ctx.writeAndFlush(encoded).channel().newPromise();
            return promise;
        }

        public String getData() {
            return data;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf result = (ByteBuf) msg;
            byte[] result1 = new byte[result.readableBytes()];
            result.readBytes(result1);
            data = new String(result1, "GBK");
            promise.setSuccess();
            result.release();
        }
    }

本来是想单独写个sendMessage方法去发送message的,后来发现sendMessage这个方法的里面的ctx时而为空,有时又正常,因为channelActive这个继承的方法会在建立链接时执行,就初始化ctx,感觉没道理会为空。想了半天,于是把这个两个方法执行的线程打印出来,才发现执行activeThread方法的线程是nioEventLoopGroup,而执行sendMessageThread方法的线程是主线程main.

---------------------执行active的线程nioEventLoopGroup-2-1
---------------------执行sendMessage的线程main

虽然在main方法中是client.connect先执行,但是建立连接确是另一个线程完成的,不在是main线程,而sendMessage是main线程执行,会出现sendMessage和channelActive并不是按照固定顺序执行,说白了就是谁抢的快谁执行。。。所以要想在获取ctx后再发送message,直接将发送报文这一步写在channelActive里面。

稍微改造下就这样。

public class NettyClient {
    public static void main(String aa[]){
        String msg = "xxx";
        System.out.println(NettyClient.sendMessage("xxx.xxx.xx.xx", 0000, msg));
    }

    public static String sendMessage(String host, int port, String msg) {
        final ClientHandler clientHandler = new ClientHandler(msg);
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer() {
                        protected void initChannel(Channel channel) throws Exception {
                            channel.pipeline().addLast(clientHandler);
                        }
                    });
// 等待客户端链接成功
            ChannelFuture future = bootstrap.connect(host, port).sync();
            System.out.println("客户端链接成功!");
// 等待客户端链接关闭
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("请求异常:",e);
            return null;
        } finally {
            group.shutdownGracefully();
        }
        return clientHandler.getData();
    }
}

class ClientHandler extends ChannelInboundHandlerAdapter {
    private String data;
    private String message;

    public ClientHandler(String message) {
        this.message = message;
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf encoded = ctx.alloc().buffer(4 * this.message.length());
        encoded.writeBytes(this.message.getBytes("GBK"));
        ctx.writeAndFlush(encoded);
        encoded.release();
    }
    public String getData() {
        return data;
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf result = (ByteBuf) msg;
        byte[] result1 = new byte[result.readableBytes()];
        result.readBytes(result1);
        data = new String(result1, "GBK");
        result.release();
    }
}