欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

NIO解决粘包问题的代码具体实现以及详细注释

程序员文章站 2022-07-14 20:18:13
...

项目结构

NIO解决粘包问题的代码具体实现以及详细注释

客户端

package com.nio.support.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class NettyNioClient {

    public void connect(int port, String host)throws Exception{
        // 配置服务端的NIO线程组,NioEventLoopGroup是个线程组,包含了一组NIO线程
        // 专门用于网路时间的处理,实际上它们就是Reactor线程组
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            // 创建ServerBootstrap对象,它是Netty用于启动Nio服务的辅助类启动器
            // 目的是降低服务端的开发复杂度
            Bootstrap bootstrap = new Bootstrap();
            // 于服务端不同channel(NioSocketChannel.class)
            /**
             * handler,创建匿名内部类,实现initChannel方法,
             * 作用是当创建NioSocketChannel成功之后
             * 在进行初始化时,将它的channelHandler设置到ChannelPipeline中,
             * 用于处理网络IO事件
             */
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            // 发起异步连接操作,调用同步阻塞方法等待连接成功
            ChannelFuture sync = bootstrap.connect(host, port).sync();
            // 等待客户端链路关闭
            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 退出,释放资源
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception{
        int port = 8083;
        if (args != null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }

        }
        new NettyNioClient().connect(port,"localhost");
    }
}
package com.nio.support.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.logging.Logger;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    /**
     * 日志
     */
    private static  final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());

    // 计数
    private int counter;
    private byte[] req;

    public NettyClientHandler() {
        req = ("NOW TIME IS NOW" + System.getProperty("line.separator")).getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        /**
         * 循环发送100条消息,每发送一条就刷新一次,保证每条消息都会被写入channel中
         */
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        // 打印服务端返回的消息
        /**
         * 客户端每接收到服务端一条应答消息之后,就打印一次计数器
         */
        System.out.println("客户端收到服务端返回的消息是: " + body + " ;计数器的计数结果是: " + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 释放资源
        logger.warning("不期而遇的异常:" + cause.getMessage());
        ctx.close();
    }
}

服务端

package com.nio.support.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class NettyNioServer {

    public void bind(int port)throws Exception{
        // 配置服务端的NIO线程组,NioEventLoopGroup是个线程组,包含了一组NIO线程
        // 专门用于网路时间的处理,实际上它们就是Reactor线程组
        NioEventLoopGroup groupParent = new NioEventLoopGroup();
        NioEventLoopGroup groupChild = new NioEventLoopGroup();

        try {
            // 创建ServerBootstrap对象,它是Netty用于启动Nio服务的辅助类启动器
            // 目的是降低服务端的开发复杂度
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 将两个线程组当参数传递到ServerBootstrap中
            // 设置创建的channel为NioServerSocketChannel
            // 配置NioServerSocketChannel的TCP参数,此处的backlog设置为1024
            // 绑定IO事件的处理类ChildChannelHandler,用于处理网络IO事件,例如记录日志,对消息进行编码等
            bootstrap.group(groupParent,groupChild)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());
            // 调用bind方法绑定端口,调用同步阻塞方法sync等待绑定操作成功
            // 返回值主要用于异步操作的通知回调
            ChannelFuture future = bootstrap.bind(port).sync();
            // 等待服务端监听端口关闭
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 退出,释放系统资源
            groupParent.shutdownGracefully();
            groupChild.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            /**
             * 新增两个解码器
             */
            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
            socketChannel.pipeline().addLast(new StringDecoder());
            socketChannel.pipeline().addLast(new NettyNioServerHandler());
        }
    }


    public static void main(String[] args)throws Exception {
        int port = 8083;
        if (args != null && args.length > 0){
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }

        }
        new NettyNioServer().bind(port);

    }


}
package com.nio.support.server;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class NettyNioServerHandler extends ChannelInboundHandlerAdapter{

    // 计数
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        /**
         * 接收到的msg就是删除回车换行符后的请求消息,不需要额外考虑读半包的问题
         * 也不需要对请求进行编码
         */
        String body = (String) msg;
        /**
         * 没读到一条客户端发来的消息就记一次数,然后发送应答消息给客户端
         * 服务端接受到的消息总个数应该和客户端发送的消息总数相同
         */
        System.out.println("服务端收到客户端发来的的消息是: " + body + " ;目前的计数结果是: " +  ++counter);

        // 判断客户端发来的消息和服务端预设值的消息是否相同
        // 如果相同就返回给客户端当前的时间
        String str = "NOW TIME IS NOW";
        String nowTime = "NOTE RIGHT";
        if (str.trim().equals(body.trim())){
            nowTime = new Date().toString();
        }
        ByteBuf resp = Unpooled.copiedBuffer((nowTime + System.getProperty("line.separator")).getBytes());
        // 异步发送应答消息给客户端
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       // 当发生异常的时候,关闭ChannelHandlerContext,释放和ChannelHandlerContext相关联的句柄等资源
        ctx.close();
    }
}

打印结果

客户端

NIO解决粘包问题的代码具体实现以及详细注释

服务端

NIO解决粘包问题的代码具体实现以及详细注释

 

NIO模拟粘问题的代码具体实现以及详细注释URL:

https://blog.csdn.net/wildwolf_001/article/details/81136354