欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

NIO学习

程序员文章站 2022-07-04 08:44:48
...

NIO非阻塞I/O学习:

学习资料:http://developer.51cto.com/art/201112/307172.htm

理解好Buffer缓冲、channel通道、Selector选择器、selectionkey

以下为为三个示例,重点看示例三(附件中有工程示例):

示例1、

package wen.nio.demo1;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
 * 服务端
 * @author Dwen
 * @version v 0.1 2013-6-28 上午10:32:24
 */
public class Server {

	private Selector selector;
	private ByteBuffer readBuffer = ByteBuffer.allocate(256);//调整缓存大小
	
	private Map<SocketChannel, byte[]> clientMessage = new ConcurrentHashMap<SocketChannel, byte[]>();
	
	/** 启动服务*/
	public void start() throws IOException{
		ServerSocketChannel ssc = ServerSocketChannel.open();
		ssc.configureBlocking(false);//设置为非阻塞
		ServerSocket socket = ssc.socket();
		socket.bind(new InetSocketAddress("127.0.0.1", 8001));
		selector = Selector.open();
		ssc.register(selector, SelectionKey.OP_ACCEPT);//将服务器Channel注册到Selector对象,并打开接收请求
		while (!Thread.currentThread().isInterrupted()) {
			selector.select();
			//获取活动网络连接选择键的集合
			Set<SelectionKey> keys = selector.selectedKeys();
			Iterator<SelectionKey> keyIterator = keys.iterator();
			while (keyIterator.hasNext()) {
				SelectionKey key = keyIterator.next();
				if (!key.isValid()) {
					continue;
				}
				if (key.isAcceptable()) {
					accept(key);
				}else if (key.isReadable()) {
					read(key);
				}
				keyIterator.remove();
			}
		}
	}
	
	private void read(SelectionKey key) throws IOException {
		SocketChannel socketChannel = (SocketChannel) key.channel();
		//清除缓存,准备放新数据
		this.readBuffer.clear();
		int numRead;
		try {
			numRead = socketChannel.read(this.readBuffer);
		} catch (Exception e) {
			// TODO: handle exception
			key.cancel();
			socketChannel.close();
			clientMessage.remove(socketChannel);
			return;
		}
		byte[] bytes = clientMessage.get(socketChannel);
		if (bytes==null) {
			bytes = new byte[0];
		}
		if (numRead>0) {
			byte[] newBytes = new byte[bytes.length+numRead];
			System.arraycopy(bytes, 0, newBytes, 0, bytes.length);
			System.arraycopy(readBuffer.array(), 0, newBytes, bytes.length, numRead);
			clientMessage.put(socketChannel, newBytes);
			System.out.println(new String(newBytes));
		}else{
			String message = new String(bytes);
			System.out.println(message);
		}
	}
	
	/** 接收客户端连接*/
	private void accept(SelectionKey key) throws IOException{
		ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
		SocketChannel clientChannel = ssc.accept();
		clientChannel.configureBlocking(false);//设置为非阻塞
		clientChannel.register(selector, SelectionKey.OP_READ);
		System.out.println("a new client connected");
	}
	
	public static void main(String[] args) throws IOException{
		System.out.println("server started...");
		new Server().start();
	}
}

 

package wen.nio.demo1;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
 * 客户端
 * @author Dwen
 * @version v 0.1 2013-6-28 上午10:32:38
 */
public class Client {
	
	public void start() throws IOException{
		SocketChannel sc = SocketChannel.open();
		sc.configureBlocking(false);
		sc.connect(new InetSocketAddress("127.0.0.1", 8001));
		Selector selector = Selector.open();
		sc.register(selector, SelectionKey.OP_CONNECT);
		Scanner scanner = new Scanner(System.in);
		while (true) {
			selector.select();
			Set<SelectionKey> keys = selector.selectedKeys();
			System.out.println("keys="+keys.size());
			Iterator<SelectionKey> keyIterator = keys.iterator();
			while (keyIterator.hasNext()) {
				SelectionKey key = keyIterator.next();
				keyIterator.remove();
				if (key.isConnectable()) {
					sc.finishConnect();
					sc.register(selector, SelectionKey.OP_WRITE);
					System.out.println("server connected...");
					break;
				}else if (key.isWritable()) {
					System.out.println("please input message");
					String message = scanner.nextLine();
					ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes());
					sc.write(writeBuffer);
				}
			}
		}
	}

	public static void main(String[] args) throws IOException {
//		new Client().start();
		for (;;) {
			System.out.println("kk");
		}
	}
}

 

示例2、

package wen.nio.demo2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOServer {

	private int flag = 0;
	//缓存区大小
	private int BLOCK = 4096;
	//发送数据缓存区
	private ByteBuffer sendBuffer = ByteBuffer.allocate(BLOCK);
	//接收数据缓存区
	private ByteBuffer reveiveBuffer = ByteBuffer.allocate(BLOCK);
	private Selector selector;
	
	public NIOServer(int port) throws IOException{
		//打开服务器套接字通道
		ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
		//服务器配置为非阻塞
		serverSocketChannel.configureBlocking(false);
		//检索与此通道关联的服务器套接字
		ServerSocket serverSocket = serverSocketChannel.socket();
		//进行服务的绑定
		serverSocket.bind(new InetSocketAddress("127.0.0.1", port));
		//通过open方法找到Selector
		selector = Selector.open();
		//注册到selector,等待连接
		serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
		System.out.println("Server Start-------------"+port);
	}
	
	//监听
	private void listen() throws IOException{
		//轮询方式
		while (true) {
			//选择一组键,并且相应的通道已经打开
			selector.select();
			//返回此选择器的已选择键集
			Set<SelectionKey> selectionKeys = selector.selectedKeys();
			Iterator<SelectionKey> iterator = selectionKeys.iterator();
			while(iterator.hasNext()){
				SelectionKey selectionKey = iterator.next();
				iterator.remove();
				//处理请求
				handleKey(selectionKey);
			}
		}
	}
	
	//处理请求
	private void handleKey(SelectionKey selectionKey) throws IOException{
		//接受请求
		ServerSocketChannel server = null;
		SocketChannel client = null;
		String receiveText;
		String sendText;
		int count = 0;
		//测试此键的通道是否已准备好接受新的套接字连接
		if (selectionKey.isAcceptable()) {
			//返回此键的通道
			server = (ServerSocketChannel) selectionKey.channel();
			//接收到此通道套接字的连接
			client = server.accept();
			//配置为非阻塞
			client.configureBlocking(false);
			//注册到selector,等待连接
			client.register(selector, SelectionKey.OP_READ);
		}else if (selectionKey.isReadable()) {
			//返回此键通道
			client = (SocketChannel) selectionKey.channel();
			//清空缓冲区以备下次读取
			reveiveBuffer.clear();
			//读取服务器发送来的数据到缓冲区中
			count = client.read(reveiveBuffer);
			if (count > 0) {
				receiveText = new String(reveiveBuffer.array(),0,count);
				System.out.println("服务器端接收客户端数据:"+receiveText);
				client.register(selector, SelectionKey.OP_WRITE);
			}
		}else if (selectionKey.isWritable()) {
			//将缓冲区清空以备下次写入
			sendBuffer.clear();
			//返回此键通道
			client = (SocketChannel) selectionKey.channel();
			sendText = "Message from server : "+flag++;
			//向缓冲区中输入数据
			sendBuffer.put(sendText.getBytes());
			//缓存区标志复位
			sendBuffer.flip();
			//输出到通道
			client.write(sendBuffer);
			System.out.println("服务器端向客户端发送数据:"+sendText);
			client.register(selector,SelectionKey.OP_READ);
		}
	}
	
	public static void main(String[] args) throws IOException {
		int port = 8989;
		NIOServer server = new NIOServer(port);
		server.listen();
	}
}

 

package wen.nio.demo2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOClient {
	private static int flag = 0;
	//缓存区大小
	private static int BLOCK = 4096;
	//发送数据缓存区
	private static ByteBuffer sendBuffer = ByteBuffer.allocate(BLOCK);
	//接收数据缓存区
	private static ByteBuffer reveiveBuffer = ByteBuffer.allocate(BLOCK);
	//服务器端地址
	private final static InetSocketAddress SERVER_ADDRESS = new InetSocketAddress("127.0.0.1", 8989);
	
	public static void main(String[] args) throws IOException {
		//打开socket通道
		SocketChannel socketChannel = SocketChannel.open();
		//设置为非阻塞方式
		socketChannel.configureBlocking(false);
		//打开选择器
		Selector selector = Selector.open();
		//注册连接服务端socket动作
		socketChannel.register(selector, SelectionKey.OP_CONNECT);
		//连接
		socketChannel.connect(SERVER_ADDRESS);
		
		Set<SelectionKey> selectionKeys;
		Iterator<SelectionKey> iterator;
		SelectionKey selectionKey;
		SocketChannel client;
		String reveiveText;
		String sendText;
		int count = 0;
		
		while(true){
			//选择一组键,其相应的通道已为I/O操作准备就绪
			selector.select();
			//返回此选择器的已选择键集
			selectionKeys = selector.selectedKeys();
			iterator = selectionKeys.iterator();
			while(iterator.hasNext()){
				selectionKey = iterator.next();
				if (selectionKey.isConnectable()) {//连接
					System.out.println("Client connect...");
					client = (SocketChannel) selectionKey.channel();
					//判断通道上是否正在进行连接操作
					//完成套接字通道的连接过程
					if (client.isConnectionPending()) {
						client.finishConnect();
						System.out.println("完成连接~");
						sendBuffer.clear();
						sendBuffer.put("Hello,Server ".getBytes());
						sendBuffer.flip();
						client.write(sendBuffer);
					}
					client.register(selector, SelectionKey.OP_READ);
				}else if (selectionKey.isReadable()) {//可读
					client = (SocketChannel) selectionKey.channel();
					//将缓存区清空以备下次读取
					reveiveBuffer.clear();
					//读取服务器端发送来的数据到缓存区中
					count = client.read(reveiveBuffer);
					if (count>0) {
						reveiveText = new String(reveiveBuffer.array(),0,count);
						System.out.println("客户端接收服务器数据:"+reveiveText);
						client.register(selector, SelectionKey.OP_WRITE);
					}
				}else if (selectionKey.isWritable()) {//可写
					sendBuffer.clear();
					client = (SocketChannel) selectionKey.channel();
					sendText = "Message from client : "+flag++;
					sendBuffer.put(sendText.getBytes());
					//将缓存区各标志复位,因为向里面put了数据标志被改变,要想人中读取数据发向服务器,需要复位
					sendBuffer.flip();
					client.write(sendBuffer);
					System.out.println("客户端向服务器端发送数据:"+sendText);
					client.register(selector, SelectionKey.OP_READ);
				}
			}
			selectionKeys.clear();
		}
	}
}

 

示例3 聊天通信:

package wen.nio.demo3;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 聊天室-服务端
 * 
 * @author Dwen
 * @version v 0.1 2013-6-28 下午03:19:23
 */
public class MySocketServer implements Runnable {

	/** 运行状态标识*/
	private boolean running;
	/** 选择器*/
	private Selector selector;
	String writeMsg;
	/** 存消息字符串*/
	StringBuffer sb = new StringBuffer();
	/** 定义选择键*/
	SelectionKey ssckey;

	/** 构造方法*/
	public MySocketServer() {
		running = true;
	}

	/**
	 * 连接初始化工作
	 */
	public void init() {
		try {
			//打开选择器
			selector = Selector.open();
			//打开socket服务通道
			ServerSocketChannel ssc = ServerSocketChannel.open();
			//设置非阻塞
			ssc.configureBlocking(false);
			//绑定地址和端口
			ssc.socket().bind(new InetSocketAddress(2345));
			//注册选择器并设为接受状态(注册后就可以监控当前通道了)
			ssckey = ssc.register(selector, SelectionKey.OP_ACCEPT);
			System.out.println("Server is starting..." + new Date());
		} catch (IOException ex) {
			Logger.getLogger(MySocketServer.class.getName()).log(Level.SEVERE,
					null, ex);
		}
	}

	/**
	 * 通信
	 */
	public void execute() {
		try {
			while (running) {
				//选择一组键
				int num = selector.select();
				if (num > 0) {
					//获得选择器中的选择键
					Iterator<SelectionKey> it = selector.selectedKeys().iterator();
					while (it.hasNext()) {
						SelectionKey key = it.next();
						it.remove();
						if (!key.isValid()){//键有效则继续
							continue;
						}
						if (key.isAcceptable()) {//可接受连接
							System.out.println("isAcceptable");
							getConnection(key);
						} else if (key.isReadable()) {//可读
							System.out.println("isReadable");
							readMssage(key);
						} else if (key.isValid() && key.isWritable()) {//可写
							if (writeMsg != null) {
								System.out.println("isWritable");
								writeMssage(key);
							}
						} else
							break;
					}
				}
				Thread.yield();//线程让步
			}
		} catch (IOException ex) {
			Logger.getLogger(MySocketServer.class.getName()).log(Level.SEVERE,
					null, ex);
		}
	}

	/**
	 * 获得连接
	 * @param key
	 * @throws IOException
	 */
	private void getConnection(SelectionKey key) throws IOException {
		//通过键获得socket通道
		ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
		//接受此通道套接字连接
		SocketChannel sc = ssc.accept();
		//设置非阻塞
		sc.configureBlocking(false);
		//注册选择器并为可读
		sc.register(selector, SelectionKey.OP_READ);
		System.out.println("Build connection :"+ sc.socket().getRemoteSocketAddress());
	}

	/**
	 * 读消息
	 * @param key 选择键
	 * @throws IOException
	 */
	private void readMssage(SelectionKey key) throws IOException {
		sb.delete(0, sb.length());
		//通过键获得通道
		SocketChannel sc = (SocketChannel) key.channel();
		System.out.print(sc.socket().getRemoteSocketAddress() + " ");
		//分配一个1024个字节缓冲区
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		//重置缓存索引值,不会对清空缓存
		buffer.clear();
		int len = 0;
		StringBuffer sb = new StringBuffer();
		while ((len = sc.read(buffer)) > 0) {
			//翻转缓存区
			buffer.flip();
			sb.append(new String(buffer.array(), 0, len));
		}
		if (sb.length() > 0){
			System.out.println("From client Get:" + sb.toString());
		}
		//当其中客户端输入quit时,则退出通道 
		if (sb.toString().trim().toLowerCase().equals("quit")) {
			sc.write(ByteBuffer.wrap("BYE".getBytes()));
			System.out.println("Client is closed "
					+ sc.socket().getRemoteSocketAddress());
			//该键的信道与它选择器被取消
			key.cancel();
			//释放通道
			sc.close();
			//释放当前socket
			sc.socket().close();
		} else {
			String toMsg = sc.socket().getRemoteSocketAddress() + "said:"
					+ sb.toString();
			System.out.println(toMsg);
			writeMsg = toMsg;
			//获得键
			Iterator<SelectionKey> it = key.selector().keys().iterator();
			while (it.hasNext()) {
				SelectionKey skey = it.next();
				if (skey != key && skey != ssckey) {
					if (skey.attachment() != null) {
						//检索当前的附加对象
						String str = (String) skey.attachment();
						//将给定的对象附加到此键
						skey.attach(str + toMsg);
					} else {
						skey.attach(toMsg);
					}
					//设当前键可写
					skey.interestOps(skey.interestOps() | SelectionKey.OP_WRITE);
				}
			}
//			selector.wakeup();
		}
	}
	
	/**
	 * 写消息
	 * @param key
	 * @throws IOException
	 */
	private void writeMssage(SelectionKey key) throws IOException {
		//获得通道
		SocketChannel sc = (SocketChannel) key.channel();
		//检索当前附加对象
		String str = (String) key.attachment();
		//内容缓存写入通道
		sc.write(ByteBuffer.wrap(str.getBytes()));
		//将此键的interest集合设置为可读
		key.interestOps(SelectionKey.OP_READ);
	}

	/**
	 * 运行线程
	 * @see java.lang.Runnable#run()
	 */
	public void run() {
		//连接初始化
		init();
		//通信
		execute();
	}


	/**
	 * 程序入口
	 * 
	 * @param args
	 */
	public static void main(String[] args) {
		MySocketServer server = new MySocketServer();
		new Thread(server).start();
	}

}

  

package wen.nio.demo3;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 聊天室-客户端
 * 客户端只需向通道中写信息和从通道中读信息
 * @author Dwen
 * @version v 0.1 2013-6-28 下午03:20:03
 */
public class MySocketClient implements Runnable {

	boolean running;
	SocketChannel sc;
	public MySocketClient() {
		running = true;
	}

	/**
	 * 连接初始化工作
	 */
	public void init() {
		try {
			//打开通道
			sc = SocketChannel.open();
			//设置非阻塞
			sc.configureBlocking(false);
			//连接服务端
			sc.connect(new InetSocketAddress("localhost", 2345));
		} catch (IOException ex) {
			Logger.getLogger(MySocketClient.class.getName()).log(Level.SEVERE,
					null, ex);
		}
	}

	/**
	 * 通信
	 */
	public void execute() {
		int num = 0;
		try {
			//完成连接
			while (!sc.finishConnect()) {}
		} catch (IOException ex) {
			Logger.getLogger(MySocketClient.class.getName()).log(Level.SEVERE,
					null, ex);
		}
		//键盘输入
		ReadKeyBoard rkb = new ReadKeyBoard();
		new Thread(rkb).start();
		while (running) {
			try {
				//分配一个1024个字节缓冲区
				ByteBuffer buffer = ByteBuffer.allocate(1024);
				buffer.clear();
				StringBuffer sb = new StringBuffer();
				Thread.sleep(500);
				while ((num = sc.read(buffer)) > 0) {
					sb.append(new String(buffer.array(), 0, num));
					buffer.clear();
				}
				if (sb.length() > 0)
					System.out.println(sb.toString());
				if (sb.toString().toLowerCase().trim().equals("bye")) {
					System.out.println("closed....");
					sc.close();
					sc.socket().close();
					rkb.close();
					running = false;
				}
			} catch (InterruptedException ex) {
				Logger.getLogger(MySocketClient.class.getName()).log(
						Level.SEVERE, null, ex);
			} catch (IOException ex) {
				Logger.getLogger(MySocketClient.class.getName()).log(
						Level.SEVERE, null, ex);
			}
		}
	}

	public void run() {
		//连接初始化工作
		init();
		//通信
		execute();
	}

	/**
	 * 键盘输入内容
	 * @author Dwen
	 * @version v 0.1 2013-7-19 上午10:40:01
	 */
	class ReadKeyBoard implements Runnable {
		boolean running2 = true;
		public ReadKeyBoard() {}

		public void close() {
			running2 = false;
		}

		public void run() {
			//读取输入内容
			BufferedReader reader = new BufferedReader(new InputStreamReader(
					System.in));
			while (running2) {
				try {
					System.out.println("Please enter commands :");
					String str = reader.readLine();
					//输入内容写入到通道
					sc.write(ByteBuffer.wrap(str.getBytes()));
				} catch (IOException ex) {
					Logger.getLogger(ReadKeyBoard.class.getName()).log(
							Level.SEVERE, null, ex);
				}
			}
		}
	}

	/**
	 * 程序入口
	 * @param args
	 */
	public static void main(String[] args) {
		MySocketClient client = new MySocketClient();
		new Thread(client).start();
	}

}

 

上一篇: LinQ

下一篇: NIO 深入学习