欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty开发的例子

程序员文章站 2022-04-23 11:49:57
...

一个Echo开发的例子的服务器端口

public class EchoServer {
    //具体的连接方法如下
    public void bind(int port) throws Throwable{
        //创建一个线程组
        NioEventLoopGroup parent=new NioEventLoopGroup();
        NioEventLoopGroup work=new NioEventLoopGroup();
        try {
            //创建一个服务器辅助启动类
            ServerBootstrap server=new ServerBootstrap();
            //设置一个这服务器的二个线程组
            server.group(parent, work);
            //给这个服务器设置一个配置的参数
            server.option(ChannelOption.SO_BACKLOG,1024)
                  .option(ChannelOption.SO_KEEPALIVE,true)
                  .channel(NioServerSocketChannel.class)
                  .handler(new LoggingHandler())//这个是日志的处理类
                  .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //进行做一些编码和解码的操作
                        ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes());
                        //现在进行创建一个以特定的字符串分割解码器
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,delimiter));
                        //进行创建一个字符串的编码器
                        ch.pipeline().addLast(new StringDecoder());
                        //注册一个接收数据的通道处理器对象
                        ch.pipeline().addLast(new ServerHandler());
                        //先进行解码操作
                        ch.pipeline().addLast(new StringEncoder());
                        }
                });
            //进行阻塞的处理
            ChannelFuture future=server.bind(port).sync();
            myMetaData(future.channel());
            System.out.println();
            //进行阻塞处理
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            parent.shutdownGracefully();
            work.shutdownGracefully();
        }


    }

    private void myMetaData(Channel channel) {
        // TODO Auto-generated method stub
        System.out.println(channel.isRegistered());//判断一下我们有数据是否是能够
        System.out.println(channel.id());//生成的唯一的id
        System.out.println(channel.parent());//当前Channel的父类的RDD

    }

    public static void main(String[] args) {
        //定义端口
        int port=8888;
        try {
            new EchoServer().bind(port);
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }
}

对应的处理事件为

public class ServerHandler extends ChannelInboundHandlerAdapter {
    private int count=0;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//      System.out.println("有数据来了...");
        //进行数据的注册处理如下
        if(msg instanceof String){
            String data=(String)msg;
            System.out.println("this is "+ ++count +" timer recevive client : ["+data+"]");
            data+="$_";
            //进行编码处理
            ByteBuf echo=Unpooled.copiedBuffer(data.getBytes());
            //发送数据回去给客户端
            ctx.pipeline().writeAndFlush(echo);
//          ctx.pipeline().writeAndFlush(data);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }

}

客户端的代码如下

public class EchoClient {
    //进行数据的连接方法
    public void connect(String host,int port){
        NioEventLoopGroup event=new NioEventLoopGroup();
        try{
            //创建一个客户端辅助启动类
            Bootstrap client=new Bootstrap();
            //进行数据的基本的绑定
            client.group(event)
                  .option(ChannelOption.SO_SNDBUF,1024)
                  .option(ChannelOption.SO_TIMEOUT,3000)
                  .channel(NioSocketChannel.class)
                  .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //进行基本的数据解码
                        ByteBuf buf=Unpooled.copiedBuffer("$_".getBytes());
                        //进行数据的基本的注册
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,buf));
                        //数据的解码
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new ClientHandler());

                    }
                });
            //发送sync连接操作
            ChannelFuture future=client.connect(host, port).sync();

            //进行基本的等待
            future.channel().closeFuture().sync();

        }catch(Exception e){

        }finally{
            event.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        String host="localhost";
        int port=8888;
        new EchoClient().connect(host, port);
    }
}

对应的处理事件为

public class ClientHandler extends ChannelInboundHandlerAdapter {
    private int count=0;
    //准备数据
    private final String ECHO_REQ="Hi yanghailong. welcome to Netty.$_";
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        for(int i=0;i<10;i++){
            //一开启启动调用这个方法,然后就开始写数据到我们的服务器
            ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("this is "+ ++count +" timer recevive client : ["+msg+"]");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }

}