欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java核心技术-NIO

程序员文章站 2023-11-23 22:18:46
1、reactor(反应器)模式 使用单线程模拟多线程,提高资源利用率和程序的效率,增加系统吞吐量。下面例子比较形象的说明了什么是反应器模式: 一个老板经营一个饭店, 传统模式 来一个客人安排一个服务员招呼,客人很满意;(相当于一个连接一个线程) 后来客人越来越多,需要的服务员越来越多,资源条件不足 ......

1、reactor(反应器)模式

  使用单线程模拟多线程,提高资源利用率和程序的效率,增加系统吞吐量。下面例子比较形象的说明了什么是反应器模式:

  一个老板经营一个饭店,

  传统模式 - 来一个客人安排一个服务员招呼,客人很满意;(相当于一个连接一个线程)

  后来客人越来越多,需要的服务员越来越多,资源条件不足以再请更多的服务员了,传统模式已经不能满足需求。老板之所以为老板自然有过人之处,老板发现,服务员在为客人服务时,当客人点菜的时候,服务员基本处于等待状态,(阻塞线程,不做事)。

  于是乎就让服务员在客人点菜的时候,去为其他客人服务,当客人菜点好后再招呼服务员即可。 --反应器(reactor)模式诞生了

  饭店的生意红红火火,几个服务员就足以支撑大量的客流量,老板用有限的资源赚了更多的money~~~~_

 通道:类似于流,但是可以异步读写数据(流只能同步读写),通道是双向的,(流是单向的),通道的数据总是要先读到一个buffer 或者 从一个buffer写入,即通道与buffer进行数据交互。

  

通道类型:  

  • filechannel:从文件中读写数据。  
  • datagramchannel:能通过udp读写网络中的数据。  
  • socketchannel:能通过tcp读写网络中的数据。  
  • serversocketchannel:可以监听新进来的tcp连接,像web服务器那样。对每一个新进来的连接都会创建一个socketchannel。  

 - filechannel比较特殊,它可以与通道进行数据交互, 不能切换到非阻塞模式,套接字通道可以切换到非阻塞模式;

缓冲区 - 本质上是一块可以存储数据的内存,被封装成了buffer对象而已!

  

缓冲区类型:

  • bytebuffer  
  • mappedbytebuffer  
  • charbuffer  
  • doublebuffer  
  • floatbuffer  
  • intbuffer  
  • longbuffer  
  • shortbuffer  

常用方法:

  • allocate() - 分配一块缓冲区  
  • put() - 向缓冲区写数据
  • get() - 向缓冲区读数据  
  • filp() - 将缓冲区从写模式切换到读模式  
  • clear() - 从读模式切换到写模式,不会清空数据,但后续写数据会覆盖原来的数据,即使有部分数据没有读,也会被遗忘;  
  • compact() - 从读数据切换到写模式,数据不会被清空,会将所有未读的数据copy到缓冲区头部,后续写数据不会覆盖,而是在这些数据之后写数据
  • mark() - 对position做出标记,配合reset使用
  • reset() - 将position置为标记值    

缓冲区的一些属性:

  • capacity - 缓冲区大小,无论是读模式还是写模式,此属性值不会变;

  • position - 写数据时,position表示当前写的位置,每写一个数据,会向下移动一个数据单元,初始为0;最大为capacity - 1,切换到读模式时,position会被置为0,表示当前读的位置

  • limit - 写模式下,limit 相当于capacity 表示最多可以写多少数据,切换到读模式时,limit 等于原先的position,表示最多可以读多少数据。

非直接缓冲区:通过allocate() 方法 分配缓冲区,将缓冲区建立在jvm内存中

直接缓冲区:通过allocatedirect() 方法直接缓冲区 将缓冲区建立在物理内存中

2.1 关于缓冲区各个属性的测试

	    string str = "abcde";
		
		//1. 分配一个指定大小的缓冲区
		bytebuffer buf = bytebuffer.allocate(1024);
		
		system.out.println("--------------allocate()----------------");
		system.out.println(buf.position());//0
		system.out.println(buf.limit());//1024
		system.out.println(buf.capacity());//1024
		
		//2. 利用put存入数据到缓冲区中去
		buf.put(str.getbytes());
		
		system.out.println("----------------put()-------------------");
		system.out.println(buf.position());//5
		system.out.println(buf.limit());//1024
		system.out.println(buf.capacity());//1024

		
		//3. 切换到读取模式
		buf.flip();
		
		system.out.println("----------------flip()------------------");
		system.out.println(buf.position());//0
		system.out.println(buf.limit());//5
		system.out.println(buf.capacity());//1024

		
		//4. 利用get() 读取缓冲区中的数据
		byte[] dst = new byte[buf.limit()];
		buf.get(dst);
		system.out.println(new string(dst,0,dst.length));
		
		system.out.println("----------------get()------------------");
		system.out.println(buf.position());//5
		system.out.println(buf.limit());//5
		system.out.println(buf.capacity());//1024

		
		//5.可重复读
		buf.rewind();
		
		system.out.println("----------------rewind()------------------");
		system.out.println(buf.position());//0
		system.out.println(buf.limit());//5
		system.out.println(buf.capacity());//1024

		
		//6.clear(): 清空缓冲区, 但是缓冲区的数据依然存在, 但是处于被遗忘的状态
		buf.clear();
		
		system.out.println("----------------clear()-------------------");
		system.out.println(buf.position());//0
		system.out.println(buf.limit());//1024
		system.out.println(buf.capacity());//1024

		byte[] newbyte = new byte[buf.limit()];
		buf.get(newbyte);
		system.out.println(new string(newbyte,0,newbyte.length));

2.2 关于通道的使用

1.利用通道进行 文件的复制 非直接缓冲区

		fileinputstream fis = null;
		fileoutputstream fos = null;
		filechannel inchannel = null;
		filechannel outchannel = null;
		try {
			fis = new fileinputstream("1.jpg");
			fos = new fileoutputstream("2.jpg");

			// ①获取通道
			inchannel = fis.getchannel();
			outchannel = fos.getchannel();

			// ②将通道中的数据存入缓冲区
			bytebuffer bytebuffer = bytebuffer.allocate(1024);

			// 将通道中的数据存入缓冲区
			while (inchannel.read(bytebuffer) != -1) {
				bytebuffer.flip(); // 切换读取数据的模式
				outchannel.write(bytebuffer);
				bytebuffer.clear();
			}

		} catch (ioexception e) {
			e.printstacktrace();
		} finally {
			if (inchannel != null) {
				try {
					inchannel.close();
				} catch (ioexception e) {
					e.printstacktrace();
				}
			}

			if (outchannel != null) {
				try {
					outchannel.close();
				} catch (ioexception e) {
					e.printstacktrace();
				}
			}

			if (fis != null) {
				try {
					fis.close();
				} catch (ioexception e) {
					e.printstacktrace();
				}
			}

			if (fos != null) {
				try {
					fos.close();
				} catch (ioexception e) {
					e.printstacktrace();
				}
			}

		}
	

2.通道之间的传输

create_new:如果文件不存在就创建,存在就报错

create:如果文件不存在就创建,存在创建(覆盖)

		filechannel inchannel = null;
		filechannel outchannel = null;
		try {
			inchannel = filechannel.open(paths.get("hello.txt"), standardopenoption.read);
			outchannel = filechannel.open(paths.get("hello2.txt"), standardopenoption.read,standardopenoption.write,standardopenoption.create_new);
			
			inchannel.transferto(0, inchannel.size(), outchannel);
		} catch (exception e) {
			e.printstacktrace();
		}  finally {
			
			if(inchannel != null){
				try {
					inchannel.close();
				} catch (ioexception e) {
					e.printstacktrace();
				}
			}
			
			if(outchannel != null){
				try {
					outchannel.close();
				} catch (ioexception e) {
					e.printstacktrace();
				}
			}
		}
	

3. 使用直接缓冲区完成内存文件的复制

		filechannel inchannel = null;
		filechannel outchannel = null;
		try {
			inchannel = filechannel.open(paths.get("1.jpg"), standardopenoption.read);
			outchannel = filechannel.open(paths.get("x.jpg"), standardopenoption.read,standardopenoption.write,standardopenoption.create_new);
			
			mappedbytebuffer inmappedbuffer = inchannel.map(mapmode.read_only, 0, inchannel.size());
			mappedbytebuffer outmappedbuffer = outchannel.map(mapmode.read_write, 0, inchannel.size());
			
			system.out.println(inmappedbuffer.limit());
			byte[] b = new byte[inmappedbuffer.limit()];;
			inmappedbuffer.get(b);
			outmappedbuffer.put(b);
			
		} catch (exception e) {
			e.printstacktrace();
		} finally {
			
			if(inchannel != null){
				try {
					inchannel.close();
				} catch (ioexception e) {
					e.printstacktrace();
				}
			}
			
			if(outchannel != null){
				try {
					outchannel.close();
				} catch (ioexception e) {
					e.printstacktrace();
				}
			}
			
		}

2.3 重点 nio-非阻塞io

个人认为 nio 最难的两点 一个是对于选择器和选择键的理解 其次是对于网络通信模型的理解

本章内容以防过长 只讲解 nio 的使用方法 上述两点参看下回分解 java核心技术-NIO

阻塞io示例:

	//客户端
	@test
	public void client() throws ioexception{
		socketchannel schannel = socketchannel.open(new inetsocketaddress("127.0.0.1", 9898));
		
		filechannel inchannel = filechannel.open(paths.get("1.jpg"), standardopenoption.read);
		
		bytebuffer buf = bytebuffer.allocate(1024);
		
		while(inchannel.read(buf) != -1){
			buf.flip();
			schannel.write(buf);
			buf.clear();
		}
		
		schannel.shutdownoutput();
		
		//接收服务端的反馈
		int len = 0;
		while((len = schannel.read(buf)) != -1){
			buf.flip();
			system.out.println(new string(buf.array(), 0, len));
			buf.clear();
		}
		
		inchannel.close();
		schannel.close();
	}
	
	//服务端
	@test
	public void server() throws ioexception{
		serversocketchannel sschannel = serversocketchannel.open();
		
		filechannel outchannel = filechannel.open(paths.get("2.jpg"), standardopenoption.write, standardopenoption.create);
		
		sschannel.bind(new inetsocketaddress(9898));
		
		socketchannel schannel = sschannel.accept();
		
		bytebuffer buf = bytebuffer.allocate(1024);
		
		while(schannel.read(buf) != -1){
			buf.flip();
			outchannel.write(buf);
			buf.clear();
		}
		
		//发送反馈给客户端
		buf.put("服务端接收数据成功".getbytes());
		buf.flip();
		schannel.write(buf);
		
		schannel.close();
		outchannel.close();
		sschannel.close();
	}

非阻塞io示例-tcp:

//客户端
	@test
	public void client() throws ioexception{
		//1. 获取通道
		socketchannel schannel = socketchannel.open(new inetsocketaddress("127.0.0.1", 9898));
		
		//2. 切换非阻塞模式
		schannel.configureblocking(false);
		
		//3. 分配指定大小的缓冲区
		bytebuffer buf = bytebuffer.allocate(1024);
		
		//4. 发送数据给服务端
		scanner scan = new scanner(system.in);
		
		while(scan.hasnext()){
			string str = scan.next();
			buf.put((new date().tostring() + "\n" + str).getbytes());
			buf.flip();
			schannel.write(buf);
			buf.clear();
		}
		
		//5. 关闭通道
		schannel.close();
	}

	//服务端
	@test
	public void server() throws ioexception{
		//1. 获取通道
		serversocketchannel sschannel = serversocketchannel.open();
		
		//2. 切换非阻塞模式
		sschannel.configureblocking(false);
		
		//3. 绑定连接
		sschannel.bind(new inetsocketaddress(9898));
		
		//4. 获取选择器
		selector selector = selector.open();
		
		//5. 将通道注册到选择器上, 并且指定“监听接收事件”
		sschannel.register(selector, selectionkey.op_accept);
		
		//6. 轮询式的获取选择器上已经“准备就绪”的事件
		while(selector.select() > 0){
			
			//7. 获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
			iterator<selectionkey> it = selector.selectedkeys().iterator();
			
			while(it.hasnext()){
				//8. 获取准备“就绪”的是事件
				selectionkey sk = it.next();
				
				//9. 判断具体是什么事件准备就绪
				if(sk.isacceptable()){
					//10. 若“接收就绪”,获取客户端连接
					socketchannel schannel = sschannel.accept();
					
					//11. 切换非阻塞模式
					schannel.configureblocking(false);
					
					//12. 将该通道注册到选择器上
					schannel.register(selector, selectionkey.op_read);
				}else if(sk.isreadable()){
					//13. 获取当前选择器上“读就绪”状态的通道
					socketchannel schannel = (socketchannel) sk.channel();
					
					//14. 读取数据
					bytebuffer buf = bytebuffer.allocate(1024);
					
					int len = 0;
					while((len = schannel.read(buf)) > 0 ){
						buf.flip();
						system.out.println(new string(buf.array(), 0, len));
						buf.clear();
					}
				}
				
				//15. 取消选择键 selectionkey
				it.remove();
			}
		}
	}

非阻塞io示例-udp:

	@test
	public void send() throws ioexception{
		datagramchannel dc = datagramchannel.open();
		
		dc.configureblocking(false);
		
		bytebuffer buf = bytebuffer.allocate(1024);
		
		scanner scan = new scanner(system.in);
		
		while(scan.hasnext()){
			string str = scan.next();
			buf.put((new date().tostring() + ":\n" + str).getbytes());
			buf.flip();
			dc.send(buf, new inetsocketaddress("127.0.0.1", 9898));
			buf.clear();
		}
		
		dc.close();
	}
	
	@test
	public void receive() throws ioexception{
		datagramchannel dc = datagramchannel.open();
		
		dc.configureblocking(false);
		
		dc.bind(new inetsocketaddress(9898));
		
		selector selector = selector.open();
		
		dc.register(selector, selectionkey.op_read);
		
		while(selector.select() > 0){
			iterator<selectionkey> it = selector.selectedkeys().iterator();
			
			while(it.hasnext()){
				selectionkey sk = it.next();
				
				if(sk.isreadable()){
					bytebuffer buf = bytebuffer.allocate(1024);
					
					dc.receive(buf);
					buf.flip();
					system.out.println(new string(buf.array(), 0, buf.limit()));
					buf.clear();
				}
			}
			
			it.remove();
		}
	}