Netty总结
什么是Netty?
Netty 是一个基于 JAVA NIO 类库的异步通信框架,它的架构特点是:异步非阻塞、基于事件驱动、高性能、高可靠性和高可定制性。
Netty应用场景
1.分布式开源框架中dubbo、Zookeeper,RocketMQ底层rpc通讯使用就是netty。
2.游戏开发中,底层使用netty通讯。
为什么选择Netty?
在本小节,我们总结下为什么不建议开发者直接使用JDK的NIO类库进行开发的原因:
1) NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等;
2) 需要具备其它的额外技能做铺垫,例如熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序;
3) 可靠性能力补齐,工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大;
4) JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该bug发生概率降低了一些而已,它并没有被根本解决。
Netty架构分析
Netty 采用了比较典型的三层网络架构进行设计
第一层:Reactor 通信调度层,它由一系列辅助类完成,包括 Reactor 线程 NioEventLoop 以及其父类、NioSocketChannel/NioServerSocketChannel 以及其父 类、ByteBuffer 以及由其衍生出来的各种 Buffer、Unsafe 以及其衍生出的各种内 部类等。该层的主要职责就是监听网络的读写和连接操作,负责将网络层的数据 读取到内存缓冲区中,然后触发各种网络事件,例如连接创建、连接**、读事 件、写事件等等,将这些事件触发到 PipeLine 中,由 PipeLine 充当的职责链来 进行后续的处理。
第二层:PipeLine 职责链,它负责事件在职责链中的有序传播,同时负责动态的 编排职责链,职责链可以选择监听和处理自己关心的事件,它可以拦截处理和向 后/向前传播事件,不同的应用的 Handler 节点的功能也不同,通常情况下,往往 会开发编解码 Hanlder 用于消息的编解码,它可以将外部的协议消息转换成内部 的 POJO 对象,这样上层业务侧只需要关心处理业务逻辑即可,不需要感知底层 的协议差异和线程模型差异,实现了架构层面的分层隔离。
第三层:业务逻辑处理层,可以分为两类:
1.纯粹的业务逻辑 处理,例如订单处理。
2.应用层协议管理,例如HTTP协议、FTP协议等。
Netty框架使用的是NIO还是AIO?
Netty中使用的是NIO,实际上在Netty4引入了AIO,但发现AIO与NIO速度上差别不大,且AIO比NIO更难维护,后移除了AIO。
初识NettyDemo
基于netty3(后文基于netty5)
Netty服务端代码:
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
/**
* Netty服务器端
*
* @author fatah
*/
public class NettyServer {
public static void main(String[] args) {
// 1、创建netty框架的服务对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 2、创建两个线程池
// 线程池1:监听端口号
ExecutorService boss = Executors.newCachedThreadPool();
// 线程池2:用于nio通讯监听
ExecutorService worker = Executors.newCachedThreadPool();
// 3、将线程池放入工厂中
serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
// 4、设置管道工厂
serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
// 5、设置管道
public ChannelPipeline getPipeline() throws Exception {
// 注意是:org.jboss.netty.channel.Channels;
ChannelPipeline pipeline = Channels.pipeline();
// 传输数据的时候为String类型时
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 设置事件监听类
pipeline.addLast("serverHandler", new ServerHandler());
// 7、返回管道对象
return pipeline;
}
});
// 8、绑定端口号
serverBootstrap.bind(new InetSocketAddress(8080));
System.out.println("netty服务器端启动成功");
}
}
// 6、书写事件监听类,并重写几个方法
class ServerHandler extends SimpleChannelHandler {
// 用于接受客户端数据
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.messageReceived(ctx, e);
System.out.println("messageReceived服务器端获取到客户端的数据为:" + e.getMessage());
ctx.getChannel().write("hello");
}
// 接收出现异常
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
super.exceptionCaught(ctx, e);
System.out.println("exceptionCaught");
}
// 必须创建连接,且通道关闭后才会触发
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelDisconnected(ctx, e);
System.out.println("channelDisconnected");
}
// 通道被关闭时候被触发
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelClosed(ctx, e);
System.out.println("channelClosed");
}
}
Netty客户端代码:
package cn.itcats.netty;
import java.net.InetSocketAddress;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
/**
* Netty客户端
*
* @author fatah
*/
public class NettyClient {
public static void main(String[] args) {
/**
* 改动位置1: ClientBootstrap() 使用客户端bootstrap
*/
ClientBootstrap clientBootstrap = new ClientBootstrap();
// 2、创建两个线程池
// 线程池1:监听端口号
ExecutorService boss = Executors.newCachedThreadPool();
// 线程池2:用于nio通讯监听
ExecutorService worker = Executors.newCachedThreadPool();
// 3、将线程池放入工厂中
/**
* 改动2: NioServerSocketChannelFactory ---> NioClientSocketChannelFactory
*/
clientBootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));
// 4、设置管道工厂
clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
// 5、设置管道
public ChannelPipeline getPipeline() throws Exception {
// 注意是:org.jboss.netty.channel.Channels;
ChannelPipeline pipeline = Channels.pipeline();
// 传输数据的时候为String类型时
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 设置事件监听类
/**
* 改动3 : new ServerHandler() --> new ClientHandler()
*/
pipeline.addLast("serverHandler", new ClientHandler());
// 7、返回管道对象
return pipeline;
}
});
// 8、绑定端口号
/**
* 改动4:设置连接ip和端口号
*/
ChannelFuture connect = clientBootstrap.connect(new InetSocketAddress("127.0.0.1", 8080));
System.out.println("netty客户端启动成功");
Channel channel = connect.getChannel();
Scanner sc = new Scanner(System.in);
while(true){
channel.write(sc.next());
}
}
}
class ClientHandler extends SimpleChannelHandler {
// 用于接受客户端数据
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.messageReceived(ctx, e);
System.out.println("messageReceived服务器端发送到客户端的数据为:" + e.getMessage());
}
// 接收出现异常
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
super.exceptionCaught(ctx, e);
System.out.println("exceptionCaught");
}
// 必须创建连接,且通道关闭后才会触发
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelDisconnected(ctx, e);
System.out.println("channelDisconnected");
}
// 通道被关闭时候被触发
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelClosed(ctx, e);
System.out.println("channelClosed");
}
}
基于netty5
Maven坐标
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>1.3.19.GA</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>1.3.18.GA</version>
<scope>test</scope>
</dependency>
服务器端
package netty5;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class NettyServer {
public static void main(String[] args){
System.out.println("netty5.0服务器端已经启动....");
try{
// 1.创建2个线程,一个负责接收客户端连接, 一个负责进行传输数据
NioEventLoopGroup pGroup = new NioEventLoopGroup();
NioEventLoopGroup cGroup = new NioEventLoopGroup();
// 2. 创建服务器辅助类ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
// 3.设置缓冲区与发送区大小
.option(ChannelOption.SO_SNDBUF, 32 * 1024).option(ChannelOption.SO_RCVBUF, 32 * 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//接受参数为String类型的
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ServerHandler());
}
});
//启动netty
ChannelFuture cf = b.bind(8080).sync();
//关闭
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}catch (Exception e) {
}
}
}
//继承ChannelHandlerAdapter
class ServerHandler extends ChannelHandlerAdapter {
/**
* 当通道被调用,执行该方法
*/
//由于设置了//接受参数为String类型的 sc.pipeline().addLast(new StringDecoder());则msg转String类型后可以直接接收
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收数据
String value = (String) msg;
System.out.println("Server msg:" + value);
// 回复给客户端 "您好!"
String res = "ok...";
//writeAndFlush()写入缓冲区并发送 若ctx.write 表示写入缓冲区但不发送
ctx.writeAndFlush(Unpooled.copiedBuffer(res.getBytes()));
}
}
客户端
package netty5;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
System.out.println("客户端已经启动....");
// 创建负责接收客户端连接
NioEventLoopGroup pGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(pGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//同样接受结果为String类型
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ClientHandler());
}
});
//设置连接的服务器ip和端口地址
ChannelFuture cf = b.connect("127.0.0.1", 8080).sync();
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("it".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cats".getBytes()));
// 等待客户端端口号关闭
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
}
}
class ClientHandler extends ChannelHandlerAdapter {
/**
* 当通道被调用,执行该方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收数据
String value = (String) msg;
System.out.println("client msg:" + value);
}
}
粘包与拆包
什么是粘包/拆包?
一个完整的业务可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这个就是TCP的拆包和封包问题。本质上是TCP的优化处理,在UDP不会出现粘包/拆包问题。
下面可以看一张图,是客户端向服务端发送包:
1. 第一种情况,Data1和Data2都分开发送到了Server端,没有产生粘包和拆包的情况。
2. 第二种情况,Data1和Data2数据粘在了一起,打成了一个大的包发送到Server端,这个情况就是粘包。
3. 第三种情况,Data2被分离成Data2_1和Data2_2,并且Data2_1在Data1之前到达了服务端,这种情况就产生了拆包。
由于网络的复杂性,可能数据会被分离成N多个复杂的拆包/粘包的情况,所以在做TCP服务器的时候就需要首先解决拆包/粘包
解决办法
1、消息定长,报文大小固定长度,不够空格补全,发送和接收方遵循相同的约定,这样即使粘包了通过接收方编程实现获取定长报文也能区分。在服务端中添加一行代码,即:
sc.pipeline().addLast(new FixedLengthFrameDecoder(10));
演示在哪添加:
服务端中添加
protected void initChannel(SocketChannel sc) throws Exception {
//接受参数为String类型的
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ServerHandler());
//设置消息定长,每10个字符作为一个包发送
sc.pipeline().addLast(new FixedLengthFrameDecoder(10));
}
这种定长处理,若客户端发送数据长度不足10,则通道阻塞,不发送任何内容。
2、包尾添加特殊分隔符,例如每条报文结束都添加回车换行符(例如FTP协议)或者指定特殊字符作为报文分隔符,接收方通过特殊分隔符切分报文区分。
ByteBuf buf = Unpooled.copiedBuffer("fengefu".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
演示在哪添加:
服务端中添加
protected void initChannel(SocketChannel sc) throws Exception {
//加入两行代码,以fengefu作为分割处理
ByteBuf buf = Unpooled.copiedBuffer("fengefu".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
//接受参数为String类型的
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ServerHandler());
}
当在客户端发送xxxxfengefu 那么xxx内容会被发送,若没有找到fengefu则通道会被阻塞,不发送任何内容
客户端演示在哪添加:
ChannelFuture cf = b.connect("127.0.0.1", 8080).sync();
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("it".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cats".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("fengefu".getBytes()));
Thread.sleep(500);
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("it".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cats".getBytes()));
那么当客户端发送的数据含有"fengefu",由于服务端对"fengefu"做了处理,itcats会粘包发送,若没第四行的"fengefu",则通道阻塞不发送任何内容。该段结果只会发送一次itcats,因为"fengefu"只有一个。