欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty的模型演进及快速入门

程序员文章站 2022-06-28 16:14:36
BIO模型Demopublic class BIOServer {public static void main(String[] args) throws Exception {ServerSocket serverSocket = new ServerSocket(6666);ExecutorService executorService = Executors.newCachedThreadPool();while (true) {System.out.printl...

BIO模型

Netty的模型演进及快速入门

Demo

public class BIOServer {
	public static void main(String[] args) throws Exception {
		ServerSocket serverSocket = new ServerSocket(6666);
		ExecutorService executorService = Executors.newCachedThreadPool();
		while (true) {
			System.out.println("等待客户端连接。。。。 ");
			Socket socket = serverSocket.accept(); //阻塞
			executorService.execute(() -> {
				try {
					InputStream inputStream = socket.getInputStream(); //阻塞
					byte[] bytes = new byte[1024];
					while (true){
						int length = inputStream.read(bytes);
						if(length == -1){
							break;
						}
						System.out.println(new String(bytes, 0, length, "UTF-8"));
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			});
		}
	}
}

这种模式存在的问题:

  • 客户端的并发数与后端的线程数成1:1的⽐例,线程的创建、销毁是⾮常消耗系统资源的,随着并发量增⼤,服务端性能将显著下降,甚⾄会发⽣线程堆栈溢出等错误。
  • 当连接创建后,如果该线程没有操作时,会进⾏阻塞操作,这样极⼤的浪费了服务器资源。

NIO模型

NIO,称之为New IO 或是 non-block IO (⾮阻塞IO),这两种说法都可以,其实称之为⾮阻塞IO更恰当⼀些。
NIO相关的代码都放在了java.nio包下,其三⼤核⼼组件: Buffer(缓冲区) 、 Channel(通道) 、Selector(选择器/多路复⽤器)

  • Buffer
    • 在NIO中,所有的读写操作都是基于缓冲区完成的,底层是通过数组实现的,常⽤的缓冲区是
      ByteBuffer,每⼀种java基本类型都有对应的缓冲区对象(除了Boolean类型),如:
      CharBuffer、 IntBuffer、 LongBuffer等。
  • Channel
    • 在BIO中是基于Stream实现,⽽在NIO中是基于通道实现,与流不同的是,通道是双向的,
      既可以读也可以写。
  • Selector
    • Selector是多路复⽤器,它会不断的轮询注册在其上的Channel,如果某个Channel上发⽣读
      或写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey获
      取就绪Channel的集合,进⾏IO的读写操作。
      Netty的模型演进及快速入门
      可以看出, NIO模型要优于BIO模型,主要是:
  • 通过多路复⽤器就可以实现⼀个线程处理多个通道,避免了多线程之间的上下⽂切换导致系统开销过⼤。
  • NIO⽆需为每⼀个连接开⼀个线程处理,并且只有通道真正有有事件时,才进⾏读写操作,这样⼤⼤的减少了系统开销。

Demo

public class SelectorDemo {
	/**
	* 注册事件
	*
	* @return
	*/
	private Selector getSelector() throws Exception {
		//获取selector对象
		Selector selector = Selector.open();
		ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
		serverSocketChannel.configureBlocking(false); //⾮阻塞
		//获取通道并且绑定端⼝
		ServerSocket socket = serverSocketChannel.socket();
		socket.bind(new InetSocketAddress(6677));
		//注册感兴趣的事件
		serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
		return selector;
	}
	
	public void listen() throws Exception {
		Selector selector = this.getSelector();
		while (true) {
			selector.select(); //该⽅法会阻塞,直到⾄少有⼀个事件的发⽣
			Set<SelectionKey> selectionKeys = selector.selectedKeys();
			Iterator<SelectionKey> iterator = selectionKeys.iterator();
			while (iterator.hasNext()) {
				SelectionKey selectionKey = iterator.next();
				process(selectionKey, selector);
				iterator.remove();
			}
		}
	}
	
	private void process(SelectionKey key, Selector selector) throws Exception{
		if(key.isAcceptable()){ //新连接请求
			ServerSocketChannel server = (ServerSocketChannel)key.channel();
			SocketChannel channel = server.accept();
			channel.configureBlocking(false); //⾮阻塞
			channel.register(selector, SelectionKey.OP_READ);
		}else if(key.isReadable()){ //读数据
			SocketChannel channel = (SocketChannel)key.channel();
			ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
			channel.read(byteBuffer);
			System.out.println("form 客户端 " + new String(byteBuffer.array(),0, byteBuffer.position()));
		}
	}
	
	public static void main(String[] args) throws Exception {
		new SelectorDemo().listen();
	}
}

AIO模型

AIO是asynchronous I/O的简称,是异步IO,该异步IO是需要依赖于操作系统底层的异步IO实现。

AIO的基本流程是:⽤户线程通过系统调⽤,告知kernel内核启动某个IO操作,⽤户线程返回。 kernel内核在整个IO操作(包括数据准备、数据复制)完成后,通知⽤户程序,⽤户执⾏后续的业务操作。
Netty的模型演进及快速入门
⽬前AIO模型存在的不⾜:

  • 需要完成事件的注册与传递,这⾥边需要底层操作系统提供⼤量的⽀持,去做⼤量的⼯作。
  • Windows 系统下通过 IOCP 实现了真正的异步 I/O。但是,就⽬前的业界形式来说, Windows 系统,很少作为百万级以上或者说⾼并发应⽤的服务器操作系统来使⽤。
  • ⽽在 Linux 系统下,异步IO模型在2.6版本才引⼊,⽬前并不完善。所以,这也是在 Linux 下,实现⾼并发⽹络编程时都是以 NIO 多路复⽤模型模式为主。

Reactor线程模型

Reactor线程模型不是Java专属,也不是Netty专属,它其实是⼀种并发编程模型,是⼀种思想,具有指导意义。⽐如, Netty就是结合了NIO的特点,应⽤了Reactor线程模型所实现的。
Reactor模型中定义的三种⻆⾊:

  • Reactor:负责监听和分配事件,将I/O事件分派给对应的Handler。新的事件包含连接建⽴就绪、读就绪、写就绪等。
  • Acceptor:处理客户端新连接,并分派请求到处理器链中。
  • Handler:将⾃身与事件绑定,执⾏⾮阻塞读/写任务,完成channel的读⼊,完成处理业务逻辑后,负责将结果写出channel。

Reactor单线程模型

Netty的模型演进及快速入门
说明:

  • Reactor充当多路复⽤器⻆⾊,监听多路连接的请求,由单线程完成
  • Reactor收到客户端发来的请求时,如果是新建连接通过Acceptor完成,其他的请求由Handler完成。
  • Handler完成业务逻辑的处理,基本的流程是: Read --> 业务处理 --> Send 。

这种模型的优缺点:

  • 优点
    • 结构简单,由单线程完成,没有多线程、进程通信等问题。
    • 适合⽤在⼀些业务逻辑⽐较简单、对于性能要求不⾼的应⽤场景。
  • 缺点
    • 由于是单线程操作,不能充分发挥多核CPU的性能。
    • 当Reactor线程负载过重之后,处理速度将变慢,这会导致⼤量客户端连接超时,超时之后往往会进⾏重发,这更加重Reactor线程的负载,最终会导致⼤量消息积压和处理超时,成为系统的性能瓶颈。
    • 可靠性差,如果该线程进⼊死循环或意外终⽌,就会导致整个通信系统不可⽤,容易造成单
      点故障。

单Reactor多线程模型

Netty的模型演进及快速入门
说明:

  • 在Reactor多线程模型相⽐较单线程模型⽽⾔,不同点在于, Handler不会处理业务逻辑,只是负责响应⽤户请求,真正的业务逻辑,在另外的线程中完成。
  • 这样可以降低Reactor的性能开销,充分利⽤CPU资源,从⽽更专注的做事件分发⼯作了,提升整个应⽤的吞吐。

但是这个模型存在的问题:

  • 多线程数据共享和访问⽐较复杂。如果⼦线程完成业务处理后,把结果传递给主线程Reactor进⾏发送,就会涉及共享数据的互斥和保护机制。
  • Reactor承担所有事件的监听和响应,只在主线程中运⾏,可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握⼿进⾏安全认证,但是认证本身⾮常损耗性能。

为了解决性能问题,产⽣了第三种主从Reactor多线程模型。

主从Reactor多线程模型

Netty的模型演进及快速入门
在主从模型中,将Reactor分成2部分:

  • MainReactor负责监听server socket,⽤来处理⽹络IO连接建⽴操作,将建⽴的socketChannel指定注册给SubReactor。
  • SubReactor主要完成和建⽴起来的socket的数据交互和事件业务处理操作。

该模型的优点:

  • 响应快,不必为单个同步事件所阻塞,虽然Reactor本身依然是同步的。
  • 可扩展性强,可以⽅便地通过增加SubReactor实例个数来充分利⽤CPU资源。
  • 可复⽤性⾼, Reactor模型本身与具体事件处理逻辑⽆关,具有很⾼的复⽤性。

Netty模型

Netty模型是基于Reactor模型实现的,对于以上三种模型都有⾮常好的⽀持,也⾮常的灵活,⼀般情况,在服务端会采⽤主从架构模型
Netty的模型演进及快速入门
说明:

  • 在Netty模型中,负责处理新连接事件的是BossGroup,负责处理其他事件的是WorkGroup。Group就是线程池的概念。
  • NioEventLoop表示⼀个不断循环的执⾏处理任务的线程,⽤于监听绑定在其上的读/写事件。
  • 通过Pipeline(管道)执⾏业务逻辑的处理, Pipeline中会有多个ChannelHandler,真正的业务逻辑是在ChannelHandler中完成的。

Demo

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">

	<modelVersion>4.0.0</modelVersion>
	<groupId>cn.wangj.myrpc</groupId>
	<artifactId>MyRPC</artifactId>
	<version>1.0-SNAPSHOT</version>
	
	<dependencies>
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.50.Final</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.12</version>
		</dependency>
	</dependencies>
	
	<build>
		<plugins>
		<!-- java编译插件 -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.2</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
					<encoding>UTF-8</encoding>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

服务端MyRPCServer

package cn.wangj.myrpc.server;

import cn.wangj.myrpc.server.handler.MyChannelInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyRPCServer {
	public void start(int port) throws Exception {
		// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
		EventLoopGroup boss = new NioEventLoopGroup(1);
		// ⼯作线程,线程数默认是: cpu*2
		EventLoopGroup worker = new NioEventLoopGroup();
		try {
			// 服务器启动类
			ServerBootstrap serverBootstrap = new ServerBootstrap();
			serverBootstrap.group(boss, worker) //设置线程组
			.channel(NioServerSocketChannel.class) //配置server通道
			.childHandler(new MyChannelInitializer()); //worker线程的处理器
			ChannelFuture future = serverBootstrap.bind(port).sync();
			System.out.println("服务器启动完成,端⼝为: " + port);
			//等待服务端监听端⼝关闭
			future.channel().closeFuture().sync();
		} finally {
			//优雅关闭
			boss.shutdownGracefully();
			worker.shutdownGracefully();
		}
	}
}

MyChannelInitializer

package cn.wangj.myrpc.server.handler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
/**
* ChannelHandler的初始化
*/
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		//将业务处理器加⼊到列表中
		ch.pipeline().addLast(new MyChannelHandler());
	}
}

MyChannelHandler

package cn.wangj.myrpc.server.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class MyChannelHandler extends ChannelInboundHandlerAdapter {
	/**
	* 获取客户端发来的数据
	*
	* @param ctx
	* @param msg
	* @throws Exception
	*/
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws
		Exception {
		ByteBuf byteBuf = (ByteBuf) msg;
		String msgStr = byteBuf.toString(CharsetUtil.UTF_8);
		System.out.println("客户端发来数据: " + msgStr);
		//向客户端发送数据
		ctx.writeAndFlush(Unpooled.copiedBuffer("ok", CharsetUtil.UTF_8));
	}
	/**
	* 异常处理
	*
	* @param ctx
	* @param cause
	* @throws Exception
	*/
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
}

测试⽤例

package cn.wangj.myrpc;

import cn.wangj.myrpc.server.MyRPCServer;
import org.junit.Test;

public class TestServer {
	@Test
	public void testServer() throws Exception{
		MyRPCServer myRPCServer = new MyRPCServer();
		myRPCServer.start(5566);
	}
}

客户端

package cn.wangj.myrpc.client;

import cn.wangj.myrpc.client.handler.MyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MyRPCClient {
	public void start(String host, int port) throws Exception {
		//定义⼯作线程组
		EventLoopGroup worker = new NioEventLoopGroup();
		try {
			//注意: client使⽤的是Bootstrap
			Bootstrap bootstrap = new Bootstrap();
			bootstrap.group(worker)
				.channel(NioSocketChannel.class) //注意: client使⽤的是NioSocketChannel
				.handler(new MyClientHandler());
			//连接到远程服务
			ChannelFuture future = bootstrap.connect(host, port).sync();
			future.channel().closeFuture().sync();
		} finally {
			worker.shutdownGracefully();
		}
	}
}

MyClientHandler

package cn.wangj.myrpc.client.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
		System.out.println("接收到服务端的消息: " + msg.toString(CharsetUtil.UTF_8));
	}
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		// 向服务端发送数据
		String msg = "hello";
		ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
	}
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
}

测试用例

package cn.wangj.myrpc;

import cn.wangj.myrpc.client.MyRPCClient;
import org.junit.Test;

public class TestClient {
	@Test
	public void testClient() throws Exception{
		new MyRPCClient().start("127.0.0.1", 5566);
	}
}

核心组件

EventLoop、 EventLoopGroup

Netty的模型演进及快速入门

ChannelHandler

Netty的模型演进及快速入门

ChannelPipeline

Netty的模型演进及快速入门

Bootstrap

Netty的模型演进及快速入门

⼩结

Netty的模型演进及快速入门
可结合https://www.processon.com/view/link/5f9827e45653bb30178b9d90中的Netty调优学习理解

本文地址:https://blog.csdn.net/wangmourena/article/details/112242624

相关标签: 架构及性能调优