NIO中的SelectionKey
程序员文章站
2022-04-24 11:41:55
...
转载自: 秦汉邮侠 https://www.jianshu.com/p/d33f2f6cdba0
要点
- 是一个抽象类,表示selectableChannel在Selector中注册的标识.每个Channel向Selector注册时,都将会创建一个selectionKey
- 选择键将Channel与Selector建立了关系,并维护了channel事件.
- 可以通过cancel方法取消键,取消的键不会立即从selector中移除,而是添加到cancelledKeys中,在下一次select操作时移除它.所以在调用某个key时,需要使用isValid进行校验.
操作集
- interest 集合:当前channel感兴趣的操作,此类操作将会在下一次选择器select操作时被交付,可以通过selectionKey.interestOps(int)进行修改.
- ready 集合:表示此选择键上,已经就绪的操作.每次select时,选择器都会对ready集合进行更新;外部程序无法修改此集合.
操作属性
- OP_ACCEPT:连接可接受操作,仅ServerSocketChannel支持
- OP_CONNECT:连接操作,Client端支持的一种操作
- OP_READ/OP_WRITE
0表示什么?
- 这些opts都不为0,如果向selector之中register一个为“0”的opts,表示此channel不关注任何类型的事件。(言外之意,register方法只是获取一个selectionKey,具体这个Channel对何种事件感兴趣,可以在稍后操作)
方法列表
- public abstract SelectableChannel channel():返回此选择键所关联的通道.即使此key已经被取消,仍然会返回.
- public abstract Selector selector():返回此选择键所关联的选择器,即使此键已经被取消,仍然会返回.
- public abstract boolean isValid():检测此key是否有效.当key被取消,或者通道被关闭,或者selector被关闭,都将导致此key无效.在AbstractSelector.removeKey(key)中,会导致selectionKey被置为无效.
- public abstract void cancel():请求将此键取消注册.一旦返回成功,那么该键就是无效的,被添加到selector的cancelledKeys中.cancel操作将key的valid属性置为false,并执行selector.cancel(key)(即将key加入cancelledkey集合)
- public abstract int interesOps():获得此键的interes集合.
- public abstract SelectionKey interestOps(int ops):将此键的interst设置为指定值.此操作会对ops和channel.validOps进行校验.如果此ops不会当前channel支持,将抛出异常.
- public abstract int readyOps():获取此键上ready操作集合.即在当前通道上已经就绪的事件.
- public final boolean isReadable(): 检测此键是否为"read"事件.等效于:k.,readyOps() & OP_READ != 0;还有isWritable(),isConnectable(),isAcceptable()
- public final Object attach(Object ob):将给定的对象作为附件添加到此key上.在key有效期间,附件可以在多个ops事件中传递.
- public final Object attachment():获取附件.一个channel的附件,可以再当前Channel(或者说是SelectionKey)生命周期*享,但是attachment数据不会作为socket数据在网络中传输.
参考代码
- Reactor
public class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), port);
serverSocket.socket().bind(address);
serverSocket.configureBlocking(false);
//向selector注册该channel
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("-->Start serverSocket.register!");
//利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
sk.attach(new Acceptor());
System.out.println("-->attach(new Acceptor()!");
}
public void run() { // normally in a new Thread
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
//Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
while (it.hasNext()) {
//来一个事件 第一次触发一个accepter线程
//以后触发SocketReadHandler
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException ex) {
System.out.println("reactor stop!" + ex);
}
}
//运行Acceptor或SocketReadHandler
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null) {
r.run();
}
}
class Acceptor implements Runnable { // inner
public void run() {
try {
System.out.println("-->ready for accept!");
SocketChannel c = serverSocket.accept();
if (c != null)
//调用Handler来处理channel
new Handler(selector, c);
} catch (IOException ex) {
}
}
}
}
- Handler
public class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(Integer.MAX_VALUE);
ByteBuffer output = ByteBuffer.allocate(Integer.MAX_VALUE);
static final int READING = 0, SENDING = 1;
int state = READING;
public Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
//设置为非阻塞模式
c.configureBlocking(false);
//此处的0,表示不关注任何时间
sk = socket.register(sel, 0);
//将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法
sk.attach(this);
//将SelectionKey标记为可读,以便读取,不可关注可写事件
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() {
return false;
}
boolean outputIsComplete() {
return false;
}
//这里可以通过线程池处理数据
void process() {
}
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) {
//
sk.cancel();
}
}
}
参考来源
上一篇: _socket