欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty的使用

程序员文章站 2022-03-02 15:25:30
...

转载请标注原文地址:https://blog.csdn.net/lilyssh/article/details/84306563
项目源码地址:https://gitee.com/lilyssh/lilyssh-rpc

项目需求

用netty实现两个项目之间的通讯。

一、Server端:

1. 在pom.xml中添加以下依赖

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>5.0.0.Alpha2</version>
</dependency>

2. 接收端:

package cn.lilyssh.receiver;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.ReferenceCountUtil;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

public class Receiver {
    public void start(){
        //根据给定的字符串内容创建一个ByteBuf。
        final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));

        //(1)、初始化用于Acceptor的主"线程池"以及用于I/O工作的从"线程池";
        //NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样可以回避多线程下的数据同步问题。默认的线程数目是 CPU 核数 × 2。
        EventLoopGroup group=new NioEventLoopGroup();
        try {
            //(2)、ServerBootstrap负责初始化netty服务器,并且开始监听端口的socket请求。
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //(3)、通过ServerBootstrap的group方法,设置(1)中初始化的主从"线程池";
            serverBootstrap.group(group)
                    //(4)、 指定通道channel的类型,由于是服务端,故而是NioServerSocketChannel;
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(2222))
                    //(5)、 设置子通道也就是SocketChannel的处理器, 其内部是实际业务开发的"主战场"
                    .childHandler(
                    new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(new ChannelHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                    ByteBuf in = (ByteBuf) msg;
                                    try {
                                        while (in.isReadable()) {
                                            System.out.print((char) in.readByte());
                                            System.out.flush();
                                        }
                                    } finally {
                                        ReferenceCountUtil.release(msg);
                                    }
                                }
                            });
                        }
                    }
            );
            //(6)、 绑定并侦听某个端口
            ChannelFuture f = serverBootstrap.bind().sync();
            f.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

3. 调用、启动:

package cn.lilyssh;

import cn.lilyssh.receiver.Receiver;

public class MainReceiver {
    public static void main(String[] args) {
        Receiver receiver = new Receiver();
        receiver.start();
    }
}

二、Client端:

1. 在pom.xml中添加以下依赖

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>5.0.0.Alpha2</version>
</dependency>

2. 发送端:

package cn.lilyssh.rpc;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

public class Sender {
    public void start(){
        //worker负责读写数据
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            //设置线程池
            b.group(worker);
            //设置socket工厂
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            //设置管道
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    //获取管道
                    ChannelPipeline pipeline = channel.pipeline();
                    //字符串解码器
                    pipeline.addLast(new StringDecoder());
                    //字符串编码器
                    pipeline.addLast(new StringEncoder());
                    //处理类
                    pipeline.addLast(new ClientHandler4());
                }
            });
            //发起异步连接操作
            ChannelFuture futrue = b.connect(new InetSocketAddress("127.0.0.1",2222)).sync();
            //等待客户端链路关闭
            futrue.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //优雅的退出,释放NIO线程组
            worker.shutdownGracefully();
        }
    }
}
class ClientHandler4 extends SimpleChannelInboundHandler<String> {

    //接受服务端发来的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server response : "+msg);
    }

    //与服务器建立连接
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //给服务器发消息
        ctx.channel().writeAndFlush("i am client !");

        System.out.println("channelActive");
    }

    //与服务器断开连接
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive");
    }

    //异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //关闭管道
        ctx.channel().close();
        //打印异常信息
        cause.printStackTrace();
    }

    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, String s) throws Exception {

    }
}

3. 调用、启动:

package cn.lilyssh.rpc;

public class MainSender {
    public static void main(String[] args) {
        Sender sender = new Sender();
        sender.start();
    }
}

会看到客户端regisry控制台打印了:channelActive,服务端lrpc控制台打印了:i am client !。
大功告成!

相关标签: Netty