使用netty发送报文的坑
程序员文章站
2022-04-22 18:03:31
...
最近跟银行调试一个接口的时候,行方说明是TCP/IP socket同步短链接的方式,开始采用socket和niosocket都不行,最后采用了了netty形式发送,代码很简单就是创建一个ChannelHandlerAdapter.主要代码如下,
测试类:
public static void main(String[] args) {
SockerClient client = new SockerClient();
Channel connect = client.connect(“xxx.xxx.xxx.xx”, xxxx);
client.sendMessage(msg)
}
class SockerClient {
private ClientHandler clientHandler = new ClientHandler();
public Channel connect(String host, int port) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(clientHandler);
}
});
return b.connect(host, port).sync().channel();
}
public String sendMessage(String msg) throws Exception {
ChannelPromise promise = clientHandler.sendMessage(msg);
promise.await();
return clientHandler.getData();
}
}
public static class ClientHandler extends ChannelInboundHandlerAdapter {
private ChannelHandlerContext ctx;
private ChannelPromise promise;
private String data;
//初始化ctx,用来后面发送报文
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("---------------------执行active的线程"+Thread.currentThread());
super.channelActive(ctx);
this.ctx = ctx;
}
//发送报文
public ChannelPromise sendMessage(String message) throws Exception {
System.out.println("---------------------执行sendMessage的线程"+Thread.currentThread());
if (ctx == null) {
throw new IllegalStateException();
}
ByteBuf encoded = ctx.alloc().buffer(4 * message.length());
encoded.writeBytes(message.getBytes("GBK"));
promise = ctx.writeAndFlush(encoded).channel().newPromise();
return promise;
}
public String getData() {
return data;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf result = (ByteBuf) msg;
byte[] result1 = new byte[result.readableBytes()];
result.readBytes(result1);
data = new String(result1, "GBK");
promise.setSuccess();
result.release();
}
}
本来是想单独写个sendMessage方法去发送message的,后来发现sendMessage这个方法的里面的ctx时而为空,有时又正常,因为channelActive这个继承的方法会在建立链接时执行,就初始化ctx,感觉没道理会为空。想了半天,于是把这个两个方法执行的线程打印出来,才发现执行activeThread方法的线程是nioEventLoopGroup,而执行sendMessageThread方法的线程是主线程main.
---------------------执行active的线程nioEventLoopGroup-2-1
---------------------执行sendMessage的线程main
虽然在main方法中是client.connect先执行,但是建立连接确是另一个线程完成的,不在是main线程,而sendMessage是main线程执行,会出现sendMessage和channelActive并不是按照固定顺序执行,说白了就是谁抢的快谁执行。。。所以要想在获取ctx后再发送message,直接将发送报文这一步写在channelActive里面。
稍微改造下就这样。
public class NettyClient {
public static void main(String aa[]){
String msg = "xxx";
System.out.println(NettyClient.sendMessage("xxx.xxx.xx.xx", 0000, msg));
}
public static String sendMessage(String host, int port, String msg) {
final ClientHandler clientHandler = new ClientHandler(msg);
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer() {
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(clientHandler);
}
});
// 等待客户端链接成功
ChannelFuture future = bootstrap.connect(host, port).sync();
System.out.println("客户端链接成功!");
// 等待客户端链接关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("请求异常:",e);
return null;
} finally {
group.shutdownGracefully();
}
return clientHandler.getData();
}
}
class ClientHandler extends ChannelInboundHandlerAdapter {
private String data;
private String message;
public ClientHandler(String message) {
this.message = message;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf encoded = ctx.alloc().buffer(4 * this.message.length());
encoded.writeBytes(this.message.getBytes("GBK"));
ctx.writeAndFlush(encoded);
encoded.release();
}
public String getData() {
return data;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf result = (ByteBuf) msg;
byte[] result1 = new byte[result.readableBytes()];
result.readBytes(result1);
data = new String(result1, "GBK");
result.release();
}
}