Netty学习笔记之Netty组件初探与第一个Netty程序的编写
文章目录
从NIO到Netty
在之前的文章中,我们分析了如何用NIO去开发一个echo服务器。虽然最后我们成功的完成了,但是开发过程中对NIO组件的使用无疑是繁琐的,并且需要考虑许多细节问题。如何使开发变得更简单呢?下面来了解一下Netty。
Netty是一个基于NIO的网络客户端框架,支持快速简单的开发诸如协议服务器客户端这样的网络应用。它极大的提升了开发一个TCP或UDP的socket服务器的编程效率,简化了编程工作。
快速和简便,并不意味着开发出来的程序会面临可扩展性或者性能问题。Netty 吸取了很多协议的开发经验,诸如 FTP,SMTP,HTTP 以及各种二进制协议以及文本协议等并且被精心的设计。正因如此,Netty 成功的达成了简便开发、高性能、良好稳定性和扩展性的目标而不是妥协。
从上可以知道,Netty能够完成基于TCP和UDP的socket开发,并且具有开发简便,高性能、良好的稳定性和扩展性等优点。
Netty组件初探
ByteBuf
再NIO中,我们了解了ByteBuffer,而Netty中的ByteBuf与ByteBuffer一样,也是代表了一段连续的二进制数据空间,但是相比于ByteBuffer,Byte’Buf显然要更加优化。
ByteBuf中只有两个指针,读指针和写指针,读指针代表数据可以从该指针位置开始读取数据,写指针则表示数据开始写入的位置。
当一个ByteBuf刚被初始化时,读写指针均指向0:
如果此时写指针写入4个字节的数据,数据存储在黄色部分:
当数据被读取了两个字节后:
从上面几个图可以看出,ByteBuf相比于ByteBuffer,十分简单,不需要例如flip这种翻转操作来控制读写。同时,ByteBuf还具有自动扩容的功能。
其实在Netty中,ByteBuf也是一个很庞大的继承体系,可以看一下相关部分的类图:
看着就十分繁琐,不过我们也不需要知道那么多,简单的来说,ByteBuf大致分为两个维度:
数据存储在堆还是直接内存;存储空间是有jvm管理进行GC还是池化由Netty自行管理。
在Netty4之后,官方推荐使用池化ByteBuf由Netty自己管理。同时,ByteBuf存储在堆外内存可以减少数据的拷贝。所以PoolDirectByteBuf是我们常用的ByteBuf。不过我们一般会使用ByteBufAllocator接口的buffer方法来自行分配ByteBuf,一般在服务器应用中,使用的都是池化的直接内存的ByteBuf。
CompositeByteBuf
在很多情况下,我们需要把多个ByteBuf组合起来,CompositeByteBuf就是这样一种实现
如图,三个ByteBuf实例组成了一个CompositeByteBuf。CompositeByteBuf是一个虚拟的ByteBuf,其内部是多个ByteBuf实例所组成的数组。
EventLoop
EventLoop,在Netty中承担的职责就是,在一个循环中,不断的处理事件,这些事件可能是用户业务中自定义事件,也可能是Netty中产生的IO事件。
NioEventLoopGroup是我们常用的EventLoop的实现:
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
如上代码,分别初始化两个NioEventLoopGroup。boss用于服务端通道,因为一个通道只能绑定一个线程,并且处理的IO事件将会在此通道上循环,所以线程大小设为1即可。而worker用于处理客户端,线程设为默认值,也就是cpu的两倍。最后将两个EventLoop都注册到服务端引导程序ServerBootStrap上。
EventLoop代表的线程模型是Netty的核心设计,以后还会详细学习。
Channel,Pipeline,ChannelHandler,ChannelHandlerContext
Netty也实现了自己对于通道的抽象,以便于在接口的层面上添加更多能力。
Channel常见实现类:
NioSocketChannel,客户端Channel。NioServerSocketChannel,服务端Channel。
EpollServerSocketChannel,针对linux的,更为高效的基于Epoll的Channel。
来看下代码场景
package com.sensen.LearnNetty.Netty.Channel;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @ClassName Demo1
* @Deacription TODO
* @Author linsen
* @Date 2020/11/10 22:24
* @Version 1.0
**/
public class ChannelDemo1 {
public static void main(String[] args) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventLoopGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
//代码1
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter());
}
});
//代码2
ChannelFuture bind = serverBootstrap.bind(9999);
bind.sync();
Channel serverchannel = bind.channel();
serverchannel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
eventLoopGroup.shutdownGracefully();
}
}
}
在代码1处,我们通过调用服务端引导程序serverBootstrap.childHandler,传入一个ChannelInitializer接口,并实现其initChannel方法。该方法一般在客户端连接进的时候触发,而传入的参数SocketChannel就是客户端Channel。
之后调用与客户端的pipeline()方法,获取与客户端通道相关的管道pipeline。在管道中添加处理器。
pipeline管道是Netty中一个很重要的概念。它是一个责任链模式的实现。它按照顺序存储了处理器ChannelHandler。
每个管道对象都与一个通道对象绑定,管道中的处理器按照功能分为入站处理器和出站处理器。从通道读入的数据会传入管道,并且将入站数据按照顺序在入站处理器中传递,而出站数据正好相反,在出站处理器中传递,最后被写到socket通道中。
ChannelHandler即是我们所说的处理器接口。ChannelInboundHandler和ChannelOutboundHandler即是ChannelHandler的子接口,前者为入站处理器接口,后者是出站处理器接口。这个接口定义了过多的方法,我们一般都会使用它的适配类,分别是ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter。
在代码中,我们可以看到在实现处理器中的方法时,会传入一个ChannelHandlerContext参数。
ChannelHandlerContext是一个与处理器紧密相连的接口,在将处理器传入管道中的时候,管道就会为处理器生成这个ChannelHandlerContext对象并作为参数传入,ChannelHandlerContext会持有对应的处理器,ChannelHandlerContext中带有指向前后ChannelHandlerContext的指针,以指针的方式实现了一个双向链表,这个链表便代表着数据处理的两种方向,入站和出站。
比如我们在处理完入站数据后想写出,就可以使用这个接口,例如:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
ctx.write(msg);
}
比如我们之前的echo服务器,在解析完入站数据之后,拿到数据,就可以调用该接口即ChannelHandlerContext 的write方法,它就会将数据沿着链表的出站方向传给自己的下一个处理器。
在代码2处,引导程序监听了端口并返回ChannelFuture对象,bind.sync()表示程序在等待异步的绑定成功。之后调用serverchannel.closeFuture().sync(),等待closeFuture的异步关闭任务结束。最后还要将eventLoopGroup释放。
第一个Netty程序
了解了Netty组件之后,我们尝试用Netty来开发一个简单的echo服务。
为了简便一点,我们对服务做以下规定:
客户端消息定长为 16 字节。
TCP 不发生拆包粘包。
客户端一次只发送一条消息。
话不多说,直接上代码。先来看下服务端的代码
package com.sensen.LearnNetty.Netty.NettyDemo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @ClassName MainDemo1
* @Deacription Netty 实现简单echo服务器
* 客户端消息定长为 16 字节
* TCP 不发生拆包粘包
* 客户端一次只发送一条消息
* @Author linsen
* @Date 2020/11/10 22:44
* @Version 1.0
**/
public class MainServer {
public static void main(String[] args) {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
//数据出现的时候,方法被调用,开始读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
if (buf.readerIndex()!=0){
throw new IllegalStateException();
}
if (buf.writerIndex() != 16)
{
throw new IllegalStateException();
}
ctx.writeAndFluwsh(buf);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端关闭通道");
}
});
}
});
ChannelFuture sync = serverBootstrap.bind(9999).sync();
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
了解了Netty组件后,再看服务器代码就比较简单了,显示初始化两个EventLoop,分别作为服务端和客户端的线程循环,然后注册到引导程序中,之后调用引导程序的childHandler注册ChannelInitializer。在其中实现initChannel方法,这个方法再客户端接入的时候会被触发,之后
在参数中传入处理器,实现处理器的方法,用channelRead方法来解析数据并在其中将数据writeAndFluwsh,传回出站方向的处理器,返回给客户端。
再来看下客户端的代码
package com.sensen.LearnNetty.Netty.NettyDemo;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.Buffer;
/**
* @ClassName MainClient
* @Deacription 客户端
* @Author linsen
* @Date 2020/11/11 10:52
* @Version 1.0
**/
public class MainClient {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup eventLoop = new NioEventLoopGroup();
bootstrap.group(eventLoop);
bootstrap.channel(NioSocketChannel.class).handler(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(8);
buffer.writeInt(1).writeInt(2).writeInt(3).writeInt(4);
ctx.writeAndFlush(buffer);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("数据发送完成");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.readInt());
System.out.println(buf.readInt());
System.out.println(buf.readInt());
System.out.println(buf.readInt());
ctx.close();
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
channelFuture.channel().closeFuture().sync();
eventLoop.shutdownGracefully();
}
}
客户端代码也大致相同,但是这里只需要自己本身的EventLoop,和上面一样,在channelRead方法中接收服务端传回的数据。在channelActive中使用ByteBuf写出数据。
本文地址:https://blog.csdn.net/sen_sen97/article/details/109615586