欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty学习笔记之Netty组件初探与第一个Netty程序的编写

程序员文章站 2022-04-15 18:26:33
文章目录从NIO到Netty二级目录三级目录从NIO到Netty在之前的文章中,我们分析了如何用二级目录三级目录...

从NIO到Netty

在之前的文章中,我们分析了如何用NIO去开发一个echo服务器。虽然最后我们成功的完成了,但是开发过程中对NIO组件的使用无疑是繁琐的,并且需要考虑许多细节问题。如何使开发变得更简单呢?下面来了解一下Netty。

Netty是一个基于NIO的网络客户端框架,支持快速简单的开发诸如协议服务器客户端这样的网络应用。它极大的提升了开发一个TCP或UDP的socket服务器的编程效率,简化了编程工作。
快速和简便,并不意味着开发出来的程序会面临可扩展性或者性能问题。Netty 吸取了很多协议的开发经验,诸如 FTP,SMTP,HTTP 以及各种二进制协议以及文本协议等并且被精心的设计。正因如此,Netty 成功的达成了简便开发、高性能、良好稳定性和扩展性的目标而不是妥协。

从上可以知道,Netty能够完成基于TCP和UDP的socket开发,并且具有开发简便,高性能、良好的稳定性和扩展性等优点。

Netty组件初探

ByteBuf

再NIO中,我们了解了ByteBuffer,而Netty中的ByteBuf与ByteBuffer一样,也是代表了一段连续的二进制数据空间,但是相比于ByteBuffer,Byte’Buf显然要更加优化。

ByteBuf中只有两个指针,读指针和写指针,读指针代表数据可以从该指针位置开始读取数据,写指针则表示数据开始写入的位置。

当一个ByteBuf刚被初始化时,读写指针均指向0:
Netty学习笔记之Netty组件初探与第一个Netty程序的编写
如果此时写指针写入4个字节的数据,数据存储在黄色部分:
Netty学习笔记之Netty组件初探与第一个Netty程序的编写
当数据被读取了两个字节后:
Netty学习笔记之Netty组件初探与第一个Netty程序的编写
从上面几个图可以看出,ByteBuf相比于ByteBuffer,十分简单,不需要例如flip这种翻转操作来控制读写。同时,ByteBuf还具有自动扩容的功能。

其实在Netty中,ByteBuf也是一个很庞大的继承体系,可以看一下相关部分的类图:
Netty学习笔记之Netty组件初探与第一个Netty程序的编写
看着就十分繁琐,不过我们也不需要知道那么多,简单的来说,ByteBuf大致分为两个维度:

数据存储在堆还是直接内存;存储空间是有jvm管理进行GC还是池化由Netty自行管理。

在Netty4之后,官方推荐使用池化ByteBuf由Netty自己管理。同时,ByteBuf存储在堆外内存可以减少数据的拷贝。所以PoolDirectByteBuf是我们常用的ByteBuf。不过我们一般会使用ByteBufAllocator接口的buffer方法来自行分配ByteBuf,一般在服务器应用中,使用的都是池化的直接内存的ByteBuf。

CompositeByteBuf

在很多情况下,我们需要把多个ByteBuf组合起来,CompositeByteBuf就是这样一种实现
Netty学习笔记之Netty组件初探与第一个Netty程序的编写
如图,三个ByteBuf实例组成了一个CompositeByteBuf。CompositeByteBuf是一个虚拟的ByteBuf,其内部是多个ByteBuf实例所组成的数组。

EventLoop

EventLoop,在Netty中承担的职责就是,在一个循环中,不断的处理事件,这些事件可能是用户业务中自定义事件,也可能是Netty中产生的IO事件。

NioEventLoopGroup是我们常用的EventLoop的实现:

EventLoopGroup  boss            = new NioEventLoopGroup(1);
EventLoopGroup  worker          = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);

如上代码,分别初始化两个NioEventLoopGroup。boss用于服务端通道,因为一个通道只能绑定一个线程,并且处理的IO事件将会在此通道上循环,所以线程大小设为1即可。而worker用于处理客户端,线程设为默认值,也就是cpu的两倍。最后将两个EventLoop都注册到服务端引导程序ServerBootStrap上。

EventLoop代表的线程模型是Netty的核心设计,以后还会详细学习。

Channel,Pipeline,ChannelHandler,ChannelHandlerContext

Netty也实现了自己对于通道的抽象,以便于在接口的层面上添加更多能力。

Channel常见实现类:

NioSocketChannel,客户端Channel。NioServerSocketChannel,服务端Channel。
EpollServerSocketChannel,针对linux的,更为高效的基于Epoll的Channel。

来看下代码场景

package com.sensen.LearnNetty.Netty.Channel;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @ClassName Demo1
 * @Deacription TODO
 * @Author linsen
 * @Date 2020/11/10 22:24
 * @Version 1.0
 **/
public class ChannelDemo1 {
    public static void main(String[] args) {
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(eventLoopGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            //代码1
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter());
                }
            });
            //代码2
            ChannelFuture bind = serverBootstrap.bind(9999);
            bind.sync();
            Channel serverchannel = bind.channel();
            serverchannel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            eventLoopGroup.shutdownGracefully();
        }

    }
}

在代码1处,我们通过调用服务端引导程序serverBootstrap.childHandler,传入一个ChannelInitializer接口,并实现其initChannel方法。该方法一般在客户端连接进的时候触发,而传入的参数SocketChannel就是客户端Channel。

之后调用与客户端的pipeline()方法,获取与客户端通道相关的管道pipeline。在管道中添加处理器。

pipeline管道是Netty中一个很重要的概念。它是一个责任链模式的实现。它按照顺序存储了处理器ChannelHandler。

每个管道对象都与一个通道对象绑定,管道中的处理器按照功能分为入站处理器和出站处理器。从通道读入的数据会传入管道,并且将入站数据按照顺序在入站处理器中传递,而出站数据正好相反,在出站处理器中传递,最后被写到socket通道中。

ChannelHandler即是我们所说的处理器接口。ChannelInboundHandler和ChannelOutboundHandler即是ChannelHandler的子接口,前者为入站处理器接口,后者是出站处理器接口。这个接口定义了过多的方法,我们一般都会使用它的适配类,分别是ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter。

在代码中,我们可以看到在实现处理器中的方法时,会传入一个ChannelHandlerContext参数。

ChannelHandlerContext是一个与处理器紧密相连的接口,在将处理器传入管道中的时候,管道就会为处理器生成这个ChannelHandlerContext对象并作为参数传入,ChannelHandlerContext会持有对应的处理器,ChannelHandlerContext中带有指向前后ChannelHandlerContext的指针,以指针的方式实现了一个双向链表,这个链表便代表着数据处理的两种方向,入站和出站。

比如我们在处理完入站数据后想写出,就可以使用这个接口,例如:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
    ctx.write(msg);
}

比如我们之前的echo服务器,在解析完入站数据之后,拿到数据,就可以调用该接口即ChannelHandlerContext 的write方法,它就会将数据沿着链表的出站方向传给自己的下一个处理器。

在代码2处,引导程序监听了端口并返回ChannelFuture对象,bind.sync()表示程序在等待异步的绑定成功。之后调用serverchannel.closeFuture().sync(),等待closeFuture的异步关闭任务结束。最后还要将eventLoopGroup释放。

第一个Netty程序

了解了Netty组件之后,我们尝试用Netty来开发一个简单的echo服务。

为了简便一点,我们对服务做以下规定:

客户端消息定长为 16 字节。
TCP 不发生拆包粘包。
客户端一次只发送一条消息。

话不多说,直接上代码。先来看下服务端的代码

package com.sensen.LearnNetty.Netty.NettyDemo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @ClassName MainDemo1
 * @Deacription Netty 实现简单echo服务器
 * 客户端消息定长为 16 字节
 * TCP 不发生拆包粘包
 * 客户端一次只发送一条消息
 * @Author linsen
 * @Date 2020/11/10 22:44
 * @Version 1.0
 **/
public class MainServer {
    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss,worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                           //数据出现的时候,方法被调用,开始读取数据
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                if (buf.readerIndex()!=0){
                                    throw new IllegalStateException();
                                }
                                if (buf.writerIndex() != 16)
                                {
                                    throw new IllegalStateException();
                                }
                                ctx.writeAndFluwsh(buf);
                            }

                            @Override
                            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                                System.out.println("客户端关闭通道");
                            }
                        });
                    }
                });


            ChannelFuture sync = serverBootstrap.bind(9999).sync();
            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }

    }
}

了解了Netty组件后,再看服务器代码就比较简单了,显示初始化两个EventLoop,分别作为服务端和客户端的线程循环,然后注册到引导程序中,之后调用引导程序的childHandler注册ChannelInitializer。在其中实现initChannel方法,这个方法再客户端接入的时候会被触发,之后
在参数中传入处理器,实现处理器的方法,用channelRead方法来解析数据并在其中将数据writeAndFluwsh,传回出站方向的处理器,返回给客户端。

再来看下客户端的代码

package com.sensen.LearnNetty.Netty.NettyDemo;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.Buffer;

/**
 * @ClassName MainClient
 * @Deacription 客户端
 * @Author linsen
 * @Date 2020/11/11 10:52
 * @Version 1.0
 **/
public class MainClient {
    public static void main(String[] args) throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup eventLoop = new NioEventLoopGroup();
        bootstrap.group(eventLoop);
        bootstrap.channel(NioSocketChannel.class).handler(new ChannelInboundHandlerAdapter(){
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(8);
                buffer.writeInt(1).writeInt(2).writeInt(3).writeInt(4);
                ctx.writeAndFlush(buffer);
            }

            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                System.out.println("数据发送完成");
            }

            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf buf = (ByteBuf) msg;
                System.out.println(buf.readInt());
                System.out.println(buf.readInt());
                System.out.println(buf.readInt());
                System.out.println(buf.readInt());
                ctx.close();
            }
        });
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
        channelFuture.channel().closeFuture().sync();
        eventLoop.shutdownGracefully();
    }
}

客户端代码也大致相同,但是这里只需要自己本身的EventLoop,和上面一样,在channelRead方法中接收服务端传回的数据。在channelActive中使用ByteBuf写出数据。

本文地址:https://blog.csdn.net/sen_sen97/article/details/109615586