欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty的入门使用

程序员文章站 2022-04-22 17:55:31
...

1:我们看下Client端的代码实现

package ruizhan.hjf.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
 * Netty客户端的程序
 * @author huangjianfei
 */
public class Client {
    /*IP地址*/
    static final String HOST = System.getProperty("host", "127.0.0.1");
    /*端口号*/
    static final int PORT1 = Integer.parseInt(System.getProperty("port", "8765"));

    static final int PORT2 = Integer.parseInt(System.getProperty("port", "8764"));

    public static void main(String[] args) throws Exception {
        EventLoopGroup workgroup = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();//客户端
        b.group(workgroup)
        .channel(NioSocketChannel.class)//客户端 -->NioSocketChannel
        .option(ChannelOption.SO_KEEPALIVE, true)
        .handler(new ChannelInitializer<SocketChannel>() {//handler
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        //创建异步连接 可添加多个端口
        ChannelFuture cf1 = b.connect(HOST, PORT1).sync();
        ChannelFuture cf2 = b.connect(HOST, PORT2).sync();

        //buf
        //client向server端发送数据  Buffer形式
        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty".getBytes()));
        cf2.channel().writeAndFlush(Unpooled.copiedBuffer("hello world".getBytes()));


        cf1.channel().closeFuture().sync();
        cf2.channel().closeFuture().sync();

        workgroup.shutdownGracefully();
    }
}

2:Servler端代码实现

package ruizhan.hjf.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
 * Netty实现的服务端程序
 * @author huangjianfei
 */
public class Server
{
    /*端口号*/
    static final int PORT1 = Integer.parseInt(System.getProperty("port", "8765"));

    static final int PORT2 = Integer.parseInt(System.getProperty("port", "8764"));

    public static void main(String[] args)
    {
        EventLoopGroup bossGroup = null;
        EventLoopGroup workerGroup = null;
        ServerBootstrap b = null;
        try{
            //1:第一个线程组是用于接收Client连接的
            bossGroup = new NioEventLoopGroup(); //(1)
            //2:第二个线程组是用于实际的业务处理操作的
            workerGroup = new NioEventLoopGroup();
            //3:创建一个启动NIO服务的辅助启动类ServerBootstrap 就是对我们的Server进行一系列的配置
            b = new ServerBootstrap();//(2)
            //4:绑定两个线程组
            b.group(bossGroup, workerGroup)
            //5:需要指定使用NioServerSocketChannel这种类型的通道
            .channel(NioServerSocketChannel.class)//(3) 服务端 -->NioServerSocketChannel
            //6:一定要使用childHandler 去绑定具体的事件处理器
            .childHandler(new ChannelInitializer<SocketChannel>() //(4)   childHandler
            {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception
                {
                    //7:将自定义的serverHandler加入到管道中去(多个)
                    sc.pipeline().addLast(new ServerHandler());//handler中实现真正的业务逻辑
//                    sc.pipeline().addLast(new ServerHandler());
//                    sc.pipeline().addLast(new ServerHandler());
                }
            })
            /**
             * 服务器端TCP内核模块维护两个队列,我们称之为A,B吧
             * 客户端向服务端connect的时候,会发送带有SYN标志的包(第一次握手)
             * 服务端收到客户端发来的SYN时,向客户端发送SYN ACK确认(第二次握手)
             * 此时TCP内核模块把客户端连接加入到A队列中,最后服务端收到客户端发来的ACK时(第三次握手)
             * TCP内核模块把客户端连接从A队列移到B队列,连接成功,应用程序的accept会返回
             * 也就是说accept从B队列中取出完成三次握手的连接
             * A队列和B队列的长度之和是backLog,当A,B队列的长度之和大于backLog时,新连接将会被TCP内核拒绝
             * 所以,如果backLog过小,可能会出现accept速度跟不上,A,B队列满了,导致新的客户端无法连接,
             * 要注意的是,backLog对程序支持的连接数并无影响,backLog影响的只是还没有被accept取出的连接
             */
            //8:设置TCP连接的缓冲区
            .option(ChannelOption.SO_BACKLOG, 128)//(5)
//            .option(ChannelOption.SO_SNDBUF, 32*1024) //设置发送缓冲大小
//            .option(ChannelOption.SO_RCVBUF, 32*1024) //设置接收缓冲大小
            //9:保持连接
            .childOption(ChannelOption.SO_KEEPALIVE, true);//(6)
            //10:绑定指定的端口 进行监听
            //此处端口号先写死  也可以绑定多个端口
            ChannelFuture cf2= b.bind(PORT1).sync(); // (7)

            ChannelFuture cf3= b.bind(PORT2).sync(); // (7)   绑定多个端口

            //Thread.sleep(10000);
            cf2.channel().closeFuture().sync(); //异步等待关闭
            cf3.channel().closeFuture().sync(); //异步等待关闭

        }catch(Exception e){
            e.printStackTrace();
        }finally{
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

3:接下来,就是真正去实现数据传输的业务逻辑层代码的实现,在这里也就是ClientHanlder和ServlerHandler

package ruizhan.hjf.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

/**
 * 客户端业务处理类
 * (编写主要的业务逻辑)
 * @author huangjianfei
 */
public class ClientHandler extends ChannelHandlerAdapter
{
    // ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放。
    // 请记住处理器的职责是释放所有传递到处理器的引用计数对象。
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        try{
            //do something
            //接收服务端发来的数据 ByteBuf
            ByteBuf  buf = (ByteBuf)msg;
            //创建一个和buf一样长度的字节空数组
            byte[] data = new byte[buf.readableBytes()];
            //将buf中的数据读取到data数组中
            buf.readBytes(data);
            //将data数组惊醒包装 以String格式输出
            String response = new String(data,"utf-8");
            System.out.println("client :"+response);

            //以上代码是接收服务端发来的反馈数据//

            ctx.close();
        }finally{
            // Discard the received data silently.
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
package ruizhan.hjf.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

/**
 * 服务端业务处理类
 * (编写主要的业务逻辑)
 * @author huangjianfei
 */
public class ServerHandler extends ChannelHandlerAdapter
{

    /**
     * 每当从客户端收到新的数据时,这个方法会在收到消息时被调用
     * ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放。
     * 请记住处理器的职责是释放所有传递到处理器的引用计数对象。
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        try{
            //do something
            //接收客户端发送的数据 ByteBuf
            ByteBuf buf = (ByteBuf)msg;
            //创建一个和buf长度一样的空字节数组
            byte[] data = new byte[buf.readableBytes()];
            //将buf中的数据读取到data数组中
            buf.readBytes(data);
            //将data数据包装成string输出
            String request = new String(data,"utf-8");
            System.out.println("server :"+request);

            //以上代码是接收客户端信息//

            //server端向client发送反馈数据
            //如果是绑定了多个端口 那么都会进行发送
            ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes()))
            .addListener(ChannelFutureListener.CLOSE);//添加监听 当服务端向客户端发送完数据后,关闭connect连接
            /**
             * ChannelFutureListener,当一个写请求完成时通知并且关闭Channel
             * 加上监听 意味着服务端回送数据到客户端时 连接关闭(短连接)
             * 不加监听 意味着客户端与服务端一直保持连接状态(长连接)
             */


            ctx.close();
        }finally{
            // Discard the received data silently.
            ReferenceCountUtil.release(msg);
        }
    }

    /**
     * exceptionCaught()事件处理方法是当出现Throwable对象才会被调用
     * 即当Netty由于IO错误或者处理器在处理事件时抛出的异常时
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }

}

 

相关标签: Netty