欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

NIO中的SelectionKey

程序员文章站 2022-04-24 11:41:55
...

转载自: 秦汉邮侠 https://www.jianshu.com/p/d33f2f6cdba0

要点

  • 是一个抽象类,表示selectableChannel在Selector中注册的标识.每个Channel向Selector注册时,都将会创建一个selectionKey
  • 选择键将Channel与Selector建立了关系,并维护了channel事件.
  • 可以通过cancel方法取消键,取消的键不会立即从selector中移除,而是添加到cancelledKeys中,在下一次select操作时移除它.所以在调用某个key时,需要使用isValid进行校验.

操作集

  • interest 集合:当前channel感兴趣的操作,此类操作将会在下一次选择器select操作时被交付,可以通过selectionKey.interestOps(int)进行修改.
  • ready 集合:表示此选择键上,已经就绪的操作.每次select时,选择器都会对ready集合进行更新;外部程序无法修改此集合.

操作属性

  • OP_ACCEPT:连接可接受操作,仅ServerSocketChannel支持
  • OP_CONNECT:连接操作,Client端支持的一种操作
  • OP_READ/OP_WRITE

0表示什么?

  • 这些opts都不为0,如果向selector之中register一个为“0”的opts,表示此channel不关注任何类型的事件。(言外之意,register方法只是获取一个selectionKey,具体这个Channel对何种事件感兴趣,可以在稍后操作)

方法列表

  • public abstract SelectableChannel channel():返回此选择键所关联的通道.即使此key已经被取消,仍然会返回.
  • public abstract Selector selector():返回此选择键所关联的选择器,即使此键已经被取消,仍然会返回.
  • public abstract boolean isValid():检测此key是否有效.当key被取消,或者通道被关闭,或者selector被关闭,都将导致此key无效.在AbstractSelector.removeKey(key)中,会导致selectionKey被置为无效.
  • public abstract void cancel():请求将此键取消注册.一旦返回成功,那么该键就是无效的,被添加到selector的cancelledKeys中.cancel操作将key的valid属性置为false,并执行selector.cancel(key)(即将key加入cancelledkey集合)
  • public abstract int interesOps():获得此键的interes集合.
  • public abstract SelectionKey interestOps(int ops):将此键的interst设置为指定值.此操作会对ops和channel.validOps进行校验.如果此ops不会当前channel支持,将抛出异常.
  • public abstract int readyOps():获取此键上ready操作集合.即在当前通道上已经就绪的事件.
  • public final boolean isReadable(): 检测此键是否为"read"事件.等效于:k.,readyOps() & OP_READ != 0;还有isWritable(),isConnectable(),isAcceptable()
  • public final Object attach(Object ob):将给定的对象作为附件添加到此key上.在key有效期间,附件可以在多个ops事件中传递.
  • public final Object attachment():获取附件.一个channel的附件,可以再当前Channel(或者说是SelectionKey)生命周期*享,但是attachment数据不会作为socket数据在网络中传输.

参考代码

  • Reactor
public class Reactor implements Runnable {

    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), port);
        serverSocket.socket().bind(address);
        serverSocket.configureBlocking(false);
        //向selector注册该channel
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("-->Start serverSocket.register!");

        //利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
        sk.attach(new Acceptor());
        System.out.println("-->attach(new Acceptor()!");
    }


    public void run() { // normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
                while (it.hasNext()) {
                    //来一个事件 第一次触发一个accepter线程
                    //以后触发SocketReadHandler
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException ex) {
            System.out.println("reactor stop!" + ex);
        }
    }

    //运行Acceptor或SocketReadHandler
    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null) {
            r.run();
        }
    }

    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                System.out.println("-->ready for accept!");
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    //调用Handler来处理channel
                    new Handler(selector, c);
            } catch (IOException ex) {
            }
        }
    }
}
  • Handler
public class Handler implements Runnable {

    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(Integer.MAX_VALUE);
    ByteBuffer output = ByteBuffer.allocate(Integer.MAX_VALUE);
    static final int READING = 0, SENDING = 1;
    int state = READING;


    public Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        //设置为非阻塞模式
        c.configureBlocking(false);
        //此处的0,表示不关注任何时间
        sk = socket.register(sel, 0);
        //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法
        sk.attach(this);
        //将SelectionKey标记为可读,以便读取,不可关注可写事件
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete() {
        return false;
    }

    boolean outputIsComplete() {
        return false;
    }

    //这里可以通过线程池处理数据
    void process() {

    }


    public void run() {
        try {
            if (state == READING) {
                read();
            } else if (state == SENDING) {
                send();
            }
        } catch (IOException ex) { /* ... */ }

    }


    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) {
            //
            sk.cancel();
        }
    }
    
}

参考来源