java-非阻塞异步通信-NIO初探
程序员文章站
2022-07-12 19:20:25
...
java的NIO为非阻塞式的Socket通信提供了以下类:
Selector类
SelectableChannel类
SelectionKey
以下为一个实现非阻塞式通信的简单实例:
服务器端
package noBlock;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
public class NServer {
//用于检测所有channel状态的selector
private Selector selector = null;
private Charset charset = Charset.forName("UTF-8");
public void init() throws IOException{
//通过 open()方法创建一个selector 实例,该方法将使用系统默认的selector 返回新的selector
selector = Selector.open();
//通过open 方法打开一个未绑定的ServerSocketChannel 实例
//打开一个监听通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress isa = new InetSocketAddress("127.0.0.1",30000);
//将该ServerScoketChannel 绑定到指定IP 地址
//监听IP地址为127.0.0.1,端口号为30000发来的消息
serverSocketChannel.socket().bind(isa);
//设置ServerSocket 以非阻塞方式工作
serverSocketChannel.configureBlocking(false);
//将 serverSocketChannel 注册到指定 Selector 对象
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
while(selector.select() > 0){
//依次处理selector 上的每个已选择的selectionKey
for(SelectionKey sk:selector.selectedKeys()){
//从selector 上的已选择key集中删除正在处理的 SelectionKey
selector.selectedKeys().remove(sk);
if(sk.isAcceptable()){
//调用accept方法接受连接,产生服务器端对应的SocketChannel
SocketChannel sc= serverSocketChannel.accept();
//设置采用非阻塞模式
sc.configureBlocking(false);
//将该SocketChannel 也注册到selector
sc.register(selector,SelectionKey.OP_READ);
//将sk 对应的Channel设置成准备接受其他请求
sk.interestOps(SelectionKey.OP_ACCEPT);
}
if(sk.isReadable()){
//获取该SelectionKey 对应的Channel ,该channel中有可读的数据
SocketChannel sc = (SocketChannel) sk.channel();
//定义准备执行数据的ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
String content="";
try {
while (sc.read(byteBuffer)>0){
byteBuffer.flip();
content += charset.decode(byteBuffer);
}
//打印从该sk 对应的Channel 里读取到的数据
System.out.println("==========="+content);
//将sk 对应的Channel 设置成下一次读取
sk.interestOps(SelectionKey.OP_READ);
} catch (Exception e) {
// TODO Auto-generated catch block
sk.cancel();
if (sk.channel()!=null) {
sk.channel().close();
}
}
if (content.length()>0) {
for (SelectionKey key :selector.keys()) {
//获取该key 对应的channel
Channel targetChannel = key.channel();
if (targetChannel instanceof SocketChannel) {
//将独到的内容写入该Channel 中
SocketChannel dest = (SocketChannel) targetChannel;
dest.write(charset.encode(content));
}
}
}
}
}
}
}
public static void main(String[] args) throws IOException {
new NServer().init();
}
}
客户端:
package noBlock;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;
public class NClient {
private Selector selector =null;
private Charset charset = Charset.forName("UTF-8");
private SocketChannel sChannel = null;
public void init() throws IOException{
selector = Selector.open();
InetSocketAddress isa = new InetSocketAddress("127.0.0.1",30000);
sChannel = SocketChannel.open(isa);
sChannel.configureBlocking(false);
sChannel.register(selector, SelectionKey.OP_READ);
new ClientThread().start();
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String line = scanner.nextLine();
sChannel.write(charset.encode(line));
}
}
private class ClientThread extends Thread
{
public void run(){
try {
while(selector.select()>0){
for(SelectionKey sKey :selector.selectedKeys()){
selector.selectedKeys().remove(sKey);
if(sKey.isReadable()){
SocketChannel sc =(SocketChannel) sKey.channel();
ByteBuffer buffer =ByteBuffer.allocate(1024);
String content ="";
while(sc.read(buffer)>0){
sc.read(buffer);
buffer.flip();
content+=charset.decode(buffer);
}
System.out.println("聊天信息" +content);
sKey.interestOps(SelectionKey.OP_READ);
}
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new NClient().init();
}
}
SelectionKey 的四种操作
OP_READ
OP_ACCEPT
服务端监听,并注册OP_ACCEPT事件后,就已准备好接受客户端的连接了
例如:serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
以及:sk.interestOps(SelectionKey.OP_ACCEPT);
interestOps( ops)以一个位图为参数,指示了应该监听信道上的哪些操作
OP_CONNECT
当客户端调用connect()并注册OP_CONNECT事件后,连接操作就会就绪。
OP_WRITE
写就绪相对有一点特殊,一般来说,你不应该注册写事件。写操作的就绪条件为底层缓冲区有空闲空间,而写缓冲区绝大部分时间都是有空闲空间的,所以当你注册写事件后,写操作一直是就绪的,选择处理线程全占用整个CPU资源。所以,只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。
当有数据在写时,将数据写到缓冲区中,并注册写事件。
public void write(byte[] data) throws IOException {
writeBuffer.put(data);
key.interestOps(SelectionKey.OP_WRITE);
}
注册写事件后,写操作就绪,这时将之前写入缓冲区的数据写入通道,并取消注册。
channel.write(writeBuffer);
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
在网上搜索有关NIO信息时,发现名为natty的异步通信框架,先记录下来。