NIO学习
程序员文章站
2022-07-04 08:44:48
...
NIO非阻塞I/O学习:
学习资料:http://developer.51cto.com/art/201112/307172.htm
理解好Buffer缓冲、channel通道、Selector选择器、selectionkey
以下为为三个示例,重点看示例三(附件中有工程示例):
示例1、
package wen.nio.demo1;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 服务端
* @author Dwen
* @version v 0.1 2013-6-28 上午10:32:24
*/
public class Server {
private Selector selector;
private ByteBuffer readBuffer = ByteBuffer.allocate(256);//调整缓存大小
private Map<SocketChannel, byte[]> clientMessage = new ConcurrentHashMap<SocketChannel, byte[]>();
/** 启动服务*/
public void start() throws IOException{
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//设置为非阻塞
ServerSocket socket = ssc.socket();
socket.bind(new InetSocketAddress("127.0.0.1", 8001));
selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);//将服务器Channel注册到Selector对象,并打开接收请求
while (!Thread.currentThread().isInterrupted()) {
selector.select();
//获取活动网络连接选择键的集合
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
accept(key);
}else if (key.isReadable()) {
read(key);
}
keyIterator.remove();
}
}
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
//清除缓存,准备放新数据
this.readBuffer.clear();
int numRead;
try {
numRead = socketChannel.read(this.readBuffer);
} catch (Exception e) {
// TODO: handle exception
key.cancel();
socketChannel.close();
clientMessage.remove(socketChannel);
return;
}
byte[] bytes = clientMessage.get(socketChannel);
if (bytes==null) {
bytes = new byte[0];
}
if (numRead>0) {
byte[] newBytes = new byte[bytes.length+numRead];
System.arraycopy(bytes, 0, newBytes, 0, bytes.length);
System.arraycopy(readBuffer.array(), 0, newBytes, bytes.length, numRead);
clientMessage.put(socketChannel, newBytes);
System.out.println(new String(newBytes));
}else{
String message = new String(bytes);
System.out.println(message);
}
}
/** 接收客户端连接*/
private void accept(SelectionKey key) throws IOException{
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = ssc.accept();
clientChannel.configureBlocking(false);//设置为非阻塞
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println("a new client connected");
}
public static void main(String[] args) throws IOException{
System.out.println("server started...");
new Server().start();
}
}
package wen.nio.demo1;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
* 客户端
* @author Dwen
* @version v 0.1 2013-6-28 上午10:32:38
*/
public class Client {
public void start() throws IOException{
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress("127.0.0.1", 8001));
Selector selector = Selector.open();
sc.register(selector, SelectionKey.OP_CONNECT);
Scanner scanner = new Scanner(System.in);
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
System.out.println("keys="+keys.size());
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isConnectable()) {
sc.finishConnect();
sc.register(selector, SelectionKey.OP_WRITE);
System.out.println("server connected...");
break;
}else if (key.isWritable()) {
System.out.println("please input message");
String message = scanner.nextLine();
ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes());
sc.write(writeBuffer);
}
}
}
}
public static void main(String[] args) throws IOException {
// new Client().start();
for (;;) {
System.out.println("kk");
}
}
}
示例2、
package wen.nio.demo2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
private int flag = 0;
//缓存区大小
private int BLOCK = 4096;
//发送数据缓存区
private ByteBuffer sendBuffer = ByteBuffer.allocate(BLOCK);
//接收数据缓存区
private ByteBuffer reveiveBuffer = ByteBuffer.allocate(BLOCK);
private Selector selector;
public NIOServer(int port) throws IOException{
//打开服务器套接字通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//服务器配置为非阻塞
serverSocketChannel.configureBlocking(false);
//检索与此通道关联的服务器套接字
ServerSocket serverSocket = serverSocketChannel.socket();
//进行服务的绑定
serverSocket.bind(new InetSocketAddress("127.0.0.1", port));
//通过open方法找到Selector
selector = Selector.open();
//注册到selector,等待连接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server Start-------------"+port);
}
//监听
private void listen() throws IOException{
//轮询方式
while (true) {
//选择一组键,并且相应的通道已经打开
selector.select();
//返回此选择器的已选择键集
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
iterator.remove();
//处理请求
handleKey(selectionKey);
}
}
}
//处理请求
private void handleKey(SelectionKey selectionKey) throws IOException{
//接受请求
ServerSocketChannel server = null;
SocketChannel client = null;
String receiveText;
String sendText;
int count = 0;
//测试此键的通道是否已准备好接受新的套接字连接
if (selectionKey.isAcceptable()) {
//返回此键的通道
server = (ServerSocketChannel) selectionKey.channel();
//接收到此通道套接字的连接
client = server.accept();
//配置为非阻塞
client.configureBlocking(false);
//注册到selector,等待连接
client.register(selector, SelectionKey.OP_READ);
}else if (selectionKey.isReadable()) {
//返回此键通道
client = (SocketChannel) selectionKey.channel();
//清空缓冲区以备下次读取
reveiveBuffer.clear();
//读取服务器发送来的数据到缓冲区中
count = client.read(reveiveBuffer);
if (count > 0) {
receiveText = new String(reveiveBuffer.array(),0,count);
System.out.println("服务器端接收客户端数据:"+receiveText);
client.register(selector, SelectionKey.OP_WRITE);
}
}else if (selectionKey.isWritable()) {
//将缓冲区清空以备下次写入
sendBuffer.clear();
//返回此键通道
client = (SocketChannel) selectionKey.channel();
sendText = "Message from server : "+flag++;
//向缓冲区中输入数据
sendBuffer.put(sendText.getBytes());
//缓存区标志复位
sendBuffer.flip();
//输出到通道
client.write(sendBuffer);
System.out.println("服务器端向客户端发送数据:"+sendText);
client.register(selector,SelectionKey.OP_READ);
}
}
public static void main(String[] args) throws IOException {
int port = 8989;
NIOServer server = new NIOServer(port);
server.listen();
}
}
package wen.nio.demo2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOClient {
private static int flag = 0;
//缓存区大小
private static int BLOCK = 4096;
//发送数据缓存区
private static ByteBuffer sendBuffer = ByteBuffer.allocate(BLOCK);
//接收数据缓存区
private static ByteBuffer reveiveBuffer = ByteBuffer.allocate(BLOCK);
//服务器端地址
private final static InetSocketAddress SERVER_ADDRESS = new InetSocketAddress("127.0.0.1", 8989);
public static void main(String[] args) throws IOException {
//打开socket通道
SocketChannel socketChannel = SocketChannel.open();
//设置为非阻塞方式
socketChannel.configureBlocking(false);
//打开选择器
Selector selector = Selector.open();
//注册连接服务端socket动作
socketChannel.register(selector, SelectionKey.OP_CONNECT);
//连接
socketChannel.connect(SERVER_ADDRESS);
Set<SelectionKey> selectionKeys;
Iterator<SelectionKey> iterator;
SelectionKey selectionKey;
SocketChannel client;
String reveiveText;
String sendText;
int count = 0;
while(true){
//选择一组键,其相应的通道已为I/O操作准备就绪
selector.select();
//返回此选择器的已选择键集
selectionKeys = selector.selectedKeys();
iterator = selectionKeys.iterator();
while(iterator.hasNext()){
selectionKey = iterator.next();
if (selectionKey.isConnectable()) {//连接
System.out.println("Client connect...");
client = (SocketChannel) selectionKey.channel();
//判断通道上是否正在进行连接操作
//完成套接字通道的连接过程
if (client.isConnectionPending()) {
client.finishConnect();
System.out.println("完成连接~");
sendBuffer.clear();
sendBuffer.put("Hello,Server ".getBytes());
sendBuffer.flip();
client.write(sendBuffer);
}
client.register(selector, SelectionKey.OP_READ);
}else if (selectionKey.isReadable()) {//可读
client = (SocketChannel) selectionKey.channel();
//将缓存区清空以备下次读取
reveiveBuffer.clear();
//读取服务器端发送来的数据到缓存区中
count = client.read(reveiveBuffer);
if (count>0) {
reveiveText = new String(reveiveBuffer.array(),0,count);
System.out.println("客户端接收服务器数据:"+reveiveText);
client.register(selector, SelectionKey.OP_WRITE);
}
}else if (selectionKey.isWritable()) {//可写
sendBuffer.clear();
client = (SocketChannel) selectionKey.channel();
sendText = "Message from client : "+flag++;
sendBuffer.put(sendText.getBytes());
//将缓存区各标志复位,因为向里面put了数据标志被改变,要想人中读取数据发向服务器,需要复位
sendBuffer.flip();
client.write(sendBuffer);
System.out.println("客户端向服务器端发送数据:"+sendText);
client.register(selector, SelectionKey.OP_READ);
}
}
selectionKeys.clear();
}
}
}
示例3 聊天通信:
package wen.nio.demo3;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 聊天室-服务端
*
* @author Dwen
* @version v 0.1 2013-6-28 下午03:19:23
*/
public class MySocketServer implements Runnable {
/** 运行状态标识*/
private boolean running;
/** 选择器*/
private Selector selector;
String writeMsg;
/** 存消息字符串*/
StringBuffer sb = new StringBuffer();
/** 定义选择键*/
SelectionKey ssckey;
/** 构造方法*/
public MySocketServer() {
running = true;
}
/**
* 连接初始化工作
*/
public void init() {
try {
//打开选择器
selector = Selector.open();
//打开socket服务通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//设置非阻塞
ssc.configureBlocking(false);
//绑定地址和端口
ssc.socket().bind(new InetSocketAddress(2345));
//注册选择器并设为接受状态(注册后就可以监控当前通道了)
ssckey = ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server is starting..." + new Date());
} catch (IOException ex) {
Logger.getLogger(MySocketServer.class.getName()).log(Level.SEVERE,
null, ex);
}
}
/**
* 通信
*/
public void execute() {
try {
while (running) {
//选择一组键
int num = selector.select();
if (num > 0) {
//获得选择器中的选择键
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if (!key.isValid()){//键有效则继续
continue;
}
if (key.isAcceptable()) {//可接受连接
System.out.println("isAcceptable");
getConnection(key);
} else if (key.isReadable()) {//可读
System.out.println("isReadable");
readMssage(key);
} else if (key.isValid() && key.isWritable()) {//可写
if (writeMsg != null) {
System.out.println("isWritable");
writeMssage(key);
}
} else
break;
}
}
Thread.yield();//线程让步
}
} catch (IOException ex) {
Logger.getLogger(MySocketServer.class.getName()).log(Level.SEVERE,
null, ex);
}
}
/**
* 获得连接
* @param key
* @throws IOException
*/
private void getConnection(SelectionKey key) throws IOException {
//通过键获得socket通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//接受此通道套接字连接
SocketChannel sc = ssc.accept();
//设置非阻塞
sc.configureBlocking(false);
//注册选择器并为可读
sc.register(selector, SelectionKey.OP_READ);
System.out.println("Build connection :"+ sc.socket().getRemoteSocketAddress());
}
/**
* 读消息
* @param key 选择键
* @throws IOException
*/
private void readMssage(SelectionKey key) throws IOException {
sb.delete(0, sb.length());
//通过键获得通道
SocketChannel sc = (SocketChannel) key.channel();
System.out.print(sc.socket().getRemoteSocketAddress() + " ");
//分配一个1024个字节缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//重置缓存索引值,不会对清空缓存
buffer.clear();
int len = 0;
StringBuffer sb = new StringBuffer();
while ((len = sc.read(buffer)) > 0) {
//翻转缓存区
buffer.flip();
sb.append(new String(buffer.array(), 0, len));
}
if (sb.length() > 0){
System.out.println("From client Get:" + sb.toString());
}
//当其中客户端输入quit时,则退出通道
if (sb.toString().trim().toLowerCase().equals("quit")) {
sc.write(ByteBuffer.wrap("BYE".getBytes()));
System.out.println("Client is closed "
+ sc.socket().getRemoteSocketAddress());
//该键的信道与它选择器被取消
key.cancel();
//释放通道
sc.close();
//释放当前socket
sc.socket().close();
} else {
String toMsg = sc.socket().getRemoteSocketAddress() + "said:"
+ sb.toString();
System.out.println(toMsg);
writeMsg = toMsg;
//获得键
Iterator<SelectionKey> it = key.selector().keys().iterator();
while (it.hasNext()) {
SelectionKey skey = it.next();
if (skey != key && skey != ssckey) {
if (skey.attachment() != null) {
//检索当前的附加对象
String str = (String) skey.attachment();
//将给定的对象附加到此键
skey.attach(str + toMsg);
} else {
skey.attach(toMsg);
}
//设当前键可写
skey.interestOps(skey.interestOps() | SelectionKey.OP_WRITE);
}
}
// selector.wakeup();
}
}
/**
* 写消息
* @param key
* @throws IOException
*/
private void writeMssage(SelectionKey key) throws IOException {
//获得通道
SocketChannel sc = (SocketChannel) key.channel();
//检索当前附加对象
String str = (String) key.attachment();
//内容缓存写入通道
sc.write(ByteBuffer.wrap(str.getBytes()));
//将此键的interest集合设置为可读
key.interestOps(SelectionKey.OP_READ);
}
/**
* 运行线程
* @see java.lang.Runnable#run()
*/
public void run() {
//连接初始化
init();
//通信
execute();
}
/**
* 程序入口
*
* @param args
*/
public static void main(String[] args) {
MySocketServer server = new MySocketServer();
new Thread(server).start();
}
}
package wen.nio.demo3;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 聊天室-客户端
* 客户端只需向通道中写信息和从通道中读信息
* @author Dwen
* @version v 0.1 2013-6-28 下午03:20:03
*/
public class MySocketClient implements Runnable {
boolean running;
SocketChannel sc;
public MySocketClient() {
running = true;
}
/**
* 连接初始化工作
*/
public void init() {
try {
//打开通道
sc = SocketChannel.open();
//设置非阻塞
sc.configureBlocking(false);
//连接服务端
sc.connect(new InetSocketAddress("localhost", 2345));
} catch (IOException ex) {
Logger.getLogger(MySocketClient.class.getName()).log(Level.SEVERE,
null, ex);
}
}
/**
* 通信
*/
public void execute() {
int num = 0;
try {
//完成连接
while (!sc.finishConnect()) {}
} catch (IOException ex) {
Logger.getLogger(MySocketClient.class.getName()).log(Level.SEVERE,
null, ex);
}
//键盘输入
ReadKeyBoard rkb = new ReadKeyBoard();
new Thread(rkb).start();
while (running) {
try {
//分配一个1024个字节缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
StringBuffer sb = new StringBuffer();
Thread.sleep(500);
while ((num = sc.read(buffer)) > 0) {
sb.append(new String(buffer.array(), 0, num));
buffer.clear();
}
if (sb.length() > 0)
System.out.println(sb.toString());
if (sb.toString().toLowerCase().trim().equals("bye")) {
System.out.println("closed....");
sc.close();
sc.socket().close();
rkb.close();
running = false;
}
} catch (InterruptedException ex) {
Logger.getLogger(MySocketClient.class.getName()).log(
Level.SEVERE, null, ex);
} catch (IOException ex) {
Logger.getLogger(MySocketClient.class.getName()).log(
Level.SEVERE, null, ex);
}
}
}
public void run() {
//连接初始化工作
init();
//通信
execute();
}
/**
* 键盘输入内容
* @author Dwen
* @version v 0.1 2013-7-19 上午10:40:01
*/
class ReadKeyBoard implements Runnable {
boolean running2 = true;
public ReadKeyBoard() {}
public void close() {
running2 = false;
}
public void run() {
//读取输入内容
BufferedReader reader = new BufferedReader(new InputStreamReader(
System.in));
while (running2) {
try {
System.out.println("Please enter commands :");
String str = reader.readLine();
//输入内容写入到通道
sc.write(ByteBuffer.wrap(str.getBytes()));
} catch (IOException ex) {
Logger.getLogger(ReadKeyBoard.class.getName()).log(
Level.SEVERE, null, ex);
}
}
}
}
/**
* 程序入口
* @param args
*/
public static void main(String[] args) {
MySocketClient client = new MySocketClient();
new Thread(client).start();
}
}