欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty总结

程序员文章站 2024-03-09 14:28:11
...

什么是Netty?

 Netty 是一个基于 JAVA NIO 类库的异步通信框架,它的架构特点是:异步非阻塞、基于事件驱动、高性能、高可靠性和高可定制性。

 

Netty应用场景

1.分布式开源框架中dubbo、Zookeeper,RocketMQ底层rpc通讯使用就是netty。

2.游戏开发中,底层使用netty通讯。

 

为什么选择Netty?

在本小节,我们总结下为什么不建议开发者直接使用JDK的NIO类库进行开发的原因:

1)      NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等;

2)      需要具备其它的额外技能做铺垫,例如熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序;

3)      可靠性能力补齐,工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大;

4)      JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该bug发生概率降低了一些而已,它并没有被根本解决。

 

 

Netty架构分析

Netty 采用了比较典型的三层网络架构进行设计

第一层:Reactor  通信调度层,它由一系列辅助类完成,包括 Reactor 线程 NioEventLoop 以及其父类、NioSocketChannel/NioServerSocketChannel 以及其父 类、ByteBuffer 以及由其衍生出来的各种 Buffer、Unsafe 以及其衍生出的各种内 部类等。该层的主要职责就是监听网络的读写和连接操作,负责将网络层的数据 读取到内存缓冲区中,然后触发各种网络事件,例如连接创建、连接**、读事 件、写事件等等,将这些事件触发到 PipeLine 中,由 PipeLine 充当的职责链来 进行后续的处理。

第二层:PipeLine     职责链,它负责事件在职责链中的有序传播,同时负责动态的 编排职责链,职责链可以选择监听和处理自己关心的事件,它可以拦截处理和向 后/向前传播事件,不同的应用的 Handler 节点的功能也不同,通常情况下,往往 会开发编解码 Hanlder 用于消息的编解码,它可以将外部的协议消息转换成内部 的 POJO 对象,这样上层业务侧只需要关心处理业务逻辑即可,不需要感知底层 的协议差异和线程模型差异,实现了架构层面的分层隔离。

第三层:业务逻辑处理层,可以分为两类:

1.纯粹的业务逻辑 处理,例如订单处理。

2.应用层协议管理,例如HTTP协议、FTP协议等。

 

Netty框架使用的是NIO还是AIO?

Netty总结

Netty中使用的是NIO,实际上在Netty4引入了AIO,但发现AIO与NIO速度上差别不大,且AIO比NIO更难维护,后移除了AIO。

 

 

初识NettyDemo

基于netty3(后文基于netty5)

Netty服务端代码:

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

/**
 * Netty服务器端
 * 
 * @author fatah
 */
public class NettyServer {
	public static void main(String[] args) {
		// 1、创建netty框架的服务对象
		ServerBootstrap serverBootstrap = new ServerBootstrap();
		// 2、创建两个线程池
		// 线程池1:监听端口号
		ExecutorService boss = Executors.newCachedThreadPool();
		// 线程池2:用于nio通讯监听
		ExecutorService worker = Executors.newCachedThreadPool();
		// 3、将线程池放入工厂中
		serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
		// 4、设置管道工厂
		serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			// 5、设置管道
			public ChannelPipeline getPipeline() throws Exception {
				// 注意是:org.jboss.netty.channel.Channels;
				ChannelPipeline pipeline = Channels.pipeline();
				// 传输数据的时候为String类型时
				pipeline.addLast("decoder", new StringDecoder());
				pipeline.addLast("encoder", new StringEncoder());
				// 设置事件监听类
				pipeline.addLast("serverHandler", new ServerHandler());
				// 7、返回管道对象
				return pipeline;
			}
		});
		// 8、绑定端口号
		serverBootstrap.bind(new InetSocketAddress(8080));
		System.out.println("netty服务器端启动成功");
	}
}

// 6、书写事件监听类,并重写几个方法
class ServerHandler extends SimpleChannelHandler {

	// 用于接受客户端数据
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		super.messageReceived(ctx, e);
		System.out.println("messageReceived服务器端获取到客户端的数据为:" + e.getMessage());
		ctx.getChannel().write("hello");
	}

	// 接收出现异常
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
		super.exceptionCaught(ctx, e);
		System.out.println("exceptionCaught");
	}

	// 必须创建连接,且通道关闭后才会触发
	public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		super.channelDisconnected(ctx, e);
		System.out.println("channelDisconnected");
	}

	// 通道被关闭时候被触发
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		super.channelClosed(ctx, e);
		System.out.println("channelClosed");
	}
}

 

Netty客户端代码:

package cn.itcats.netty;

import java.net.InetSocketAddress;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;

/**
 * Netty客户端
 * 
 * @author fatah
 */
public class NettyClient {
	public static void main(String[] args) {
		/**
		 * 改动位置1: ClientBootstrap() 使用客户端bootstrap
		 */
		ClientBootstrap clientBootstrap = new ClientBootstrap();
		// 2、创建两个线程池
		// 线程池1:监听端口号
		ExecutorService boss = Executors.newCachedThreadPool();
		// 线程池2:用于nio通讯监听
		ExecutorService worker = Executors.newCachedThreadPool();
		// 3、将线程池放入工厂中
		/**
		 * 改动2:   NioServerSocketChannelFactory ---> NioClientSocketChannelFactory
		 */
		clientBootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));
		// 4、设置管道工厂
		clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			// 5、设置管道
			public ChannelPipeline getPipeline() throws Exception {
				// 注意是:org.jboss.netty.channel.Channels;
				ChannelPipeline pipeline = Channels.pipeline();
				// 传输数据的时候为String类型时
				pipeline.addLast("decoder", new StringDecoder());
				pipeline.addLast("encoder", new StringEncoder());
				// 设置事件监听类
				/**
				 * 改动3 :  new ServerHandler() -->   new ClientHandler()
				 */
				pipeline.addLast("serverHandler", new ClientHandler());
				// 7、返回管道对象
				return pipeline;
			}
		});
		// 8、绑定端口号
		/**
		 * 改动4:设置连接ip和端口号
		 */
		ChannelFuture connect = clientBootstrap.connect(new InetSocketAddress("127.0.0.1", 8080));
		System.out.println("netty客户端启动成功");
		Channel channel = connect.getChannel();
		Scanner sc = new Scanner(System.in);
		while(true){
			channel.write(sc.next());
		}
	}
}

class ClientHandler extends SimpleChannelHandler {

	// 用于接受客户端数据
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		super.messageReceived(ctx, e);
		System.out.println("messageReceived服务器端发送到客户端的数据为:" + e.getMessage());
	}

	// 接收出现异常
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
		super.exceptionCaught(ctx, e);
		System.out.println("exceptionCaught");
	}

	// 必须创建连接,且通道关闭后才会触发
	public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		super.channelDisconnected(ctx, e);
		System.out.println("channelDisconnected");
	}

	// 通道被关闭时候被触发
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		super.channelClosed(ctx, e);
		System.out.println("channelClosed");
	}
}

 

基于netty5

Maven坐标

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>5.0.0.Alpha2</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling -->
		<dependency>
			<groupId>org.jboss.marshalling</groupId>
			<artifactId>jboss-marshalling</artifactId>
			<version>1.3.19.GA</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
		<dependency>
			<groupId>org.jboss.marshalling</groupId>
			<artifactId>jboss-marshalling-serial</artifactId>
			<version>1.3.18.GA</version>
			<scope>test</scope>
		</dependency>

 

服务器端

package netty5;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class NettyServer {

	public static void main(String[] args){
		System.out.println("netty5.0服务器端已经启动....");
		try{
		// 1.创建2个线程,一个负责接收客户端连接, 一个负责进行传输数据
		NioEventLoopGroup pGroup = new NioEventLoopGroup();
		NioEventLoopGroup cGroup = new NioEventLoopGroup();
		// 2. 创建服务器辅助类ServerBootstrap
		ServerBootstrap b = new ServerBootstrap();
		b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
				// 3.设置缓冲区与发送区大小
				.option(ChannelOption.SO_SNDBUF, 32 * 1024).option(ChannelOption.SO_RCVBUF, 32 * 1024)
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel sc) throws Exception {
						//接受参数为String类型的
						sc.pipeline().addLast(new StringDecoder());
						sc.pipeline().addLast(new ServerHandler());
					}
				});
		//启动netty
		ChannelFuture cf = b.bind(8080).sync();
		//关闭
		cf.channel().closeFuture().sync();
		pGroup.shutdownGracefully();
		cGroup.shutdownGracefully();
		}catch (Exception e) {
		}
	}

}


//继承ChannelHandlerAdapter
class ServerHandler extends ChannelHandlerAdapter {
	/**
	 * 当通道被调用,执行该方法
	 */
	//由于设置了//接受参数为String类型的 sc.pipeline().addLast(new StringDecoder());则msg转String类型后可以直接接收
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		// 接收数据
		String value = (String) msg;
		System.out.println("Server msg:" + value);
		// 回复给客户端 "您好!"
		String res = "ok...";
		//writeAndFlush()写入缓冲区并发送     若ctx.write 表示写入缓冲区但不发送
		ctx.writeAndFlush(Unpooled.copiedBuffer(res.getBytes()));
	}

}

 

 

客户端

package netty5;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;


public class NettyClient {

	public static void main(String[] args) throws InterruptedException {
		System.out.println("客户端已经启动....");
		// 创建负责接收客户端连接
		NioEventLoopGroup pGroup = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(pGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				//同样接受结果为String类型
				sc.pipeline().addLast(new StringDecoder());
				sc.pipeline().addLast(new ClientHandler());
			}
		});
		//设置连接的服务器ip和端口地址
		ChannelFuture cf = b.connect("127.0.0.1", 8080).sync();
		 cf.channel().writeAndFlush(Unpooled.wrappedBuffer("it".getBytes()));
		 cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cats".getBytes()));
		// 等待客户端端口号关闭
		cf.channel().closeFuture().sync();
		pGroup.shutdownGracefully();

	}
}

class ClientHandler extends ChannelHandlerAdapter {

	/**
	 * 当通道被调用,执行该方法
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		// 接收数据
		String value = (String) msg;
		System.out.println("client msg:" + value);
	}

	
}

 

粘包与拆包

什么是粘包/拆包?

   一个完整的业务可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这个就是TCP的拆包和封包问题。本质上是TCP的优化处理,在UDP不会出现粘包/拆包问题。

下面可以看一张图,是客户端向服务端发送包:

Netty总结

1. 第一种情况,Data1和Data2都分开发送到了Server端,没有产生粘包和拆包的情况。

2. 第二种情况,Data1和Data2数据粘在了一起,打成了一个大的包发送到Server端,这个情况就是粘包。

3. 第三种情况,Data2被分离成Data2_1和Data2_2,并且Data2_1在Data1之前到达了服务端,这种情况就产生了拆包。

由于网络的复杂性,可能数据会被分离成N多个复杂的拆包/粘包的情况,所以在做TCP服务器的时候就需要首先解决拆包/粘包

 

解决办法

     1、消息定长,报文大小固定长度,不够空格补全,发送和接收方遵循相同的约定,这样即使粘包了通过接收方编程实现获取定长报文也能区分。在服务端中添加一行代码,即:

sc.pipeline().addLast(new FixedLengthFrameDecoder(10));

演示在哪添加:

服务端中添加

protected void initChannel(SocketChannel sc) throws Exception {
			//接受参数为String类型的
			sc.pipeline().addLast(new StringDecoder());
			sc.pipeline().addLast(new ServerHandler());
                        //设置消息定长,每10个字符作为一个包发送
                        sc.pipeline().addLast(new FixedLengthFrameDecoder(10));
}

这种定长处理,若客户端发送数据长度不足10,则通道阻塞,不发送任何内容。

 

2、包尾添加特殊分隔符,例如每条报文结束都添加回车换行符(例如FTP协议)或者指定特殊字符作为报文分隔符,接收方通过特殊分隔符切分报文区分。

ByteBuf buf = Unpooled.copiedBuffer("fengefu".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));

演示在哪添加:

服务端中添加

protected void initChannel(SocketChannel sc) throws Exception {
			//加入两行代码,以fengefu作为分割处理
			ByteBuf buf = Unpooled.copiedBuffer("fengefu".getBytes());
			sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
			//接受参数为String类型的
			sc.pipeline().addLast(new StringDecoder());
			sc.pipeline().addLast(new ServerHandler());
}

当在客户端发送xxxxfengefu    那么xxx内容会被发送,若没有找到fengefu则通道会被阻塞,不发送任何内容

 

客户端演示在哪添加:

	ChannelFuture cf = b.connect("127.0.0.1", 8080).sync();
		 cf.channel().writeAndFlush(Unpooled.wrappedBuffer("it".getBytes()));
		 cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cats".getBytes()));
		 cf.channel().writeAndFlush(Unpooled.wrappedBuffer("fengefu".getBytes()));
		 Thread.sleep(500);
		 cf.channel().writeAndFlush(Unpooled.wrappedBuffer("it".getBytes()));
		 cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cats".getBytes()));

那么当客户端发送的数据含有"fengefu",由于服务端对"fengefu"做了处理,itcats会粘包发送,若没第四行的"fengefu",则通道阻塞不发送任何内容。该段结果只会发送一次itcats,因为"fengefu"只有一个。

 

 

 

 

 

 

 

 

相关标签: netty