欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Selector的wakeup的分析 javasunnio

程序员文章站 2024-03-18 18:11:58
...
Selector的wakeup的分析可以看http://www.iteye.com/topic/1113639

很久之前看过这篇文章,最近重新看,又有新的理解了。

selectNow的选择过程是非阻塞的,select(timeout)和select()的选择过程是阻塞的。
sun.nio.ch.PollSelectorImpl类中
    protected int doSelect(long timeout) throws IOException {
        if (channelArray == null)
            throw new ClosedSelectorException();
        processDeregisterQueue();
        try {
            begin();
            pollWrapper.poll(totalChannels, 0, timeout);
        } finally {
            end();
        }
        processDeregisterQueue();
        int numKeysUpdated = updateSelectedKeys();
        if (pollWrapper.getReventOps(0) != 0) {
            // Clear the wakeup pipe
            pollWrapper.putReventOps(0, 0);
            synchronized (interruptLock) {
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }

需要注意两点
1.processDeregisterQueue方法
  在SelectionKey的cancel()的API说明中:
  The key will be removed from all of the selector's key sets during the next selection operation
  即在下一次执行selection操作时,才会将key删除掉。从上面的doSelect看到,执行poll调用之前会执行processDeregisterQueue进行删除操作。
2.Select的方法最终会调用内核poll函数,在PollArrayWrapper.c中
    ipoll(struct pollfd fds[], unsigned int nfds, int timeout) {
        jlong start, now;
        int remaining = timeout;
        struct timeval t;
        int diff;
        gettimeofday(&t, NULL);
        start = t.tv_sec * 1000 + t.tv_usec / 1000;
        for (;;) {
            int res = poll(fds, nfds, remaining);
            if (res < 0 && errno == EINTR) {
                if (remaining >= 0) {
                    gettimeofday(&t, NULL);
                    now = t.tv_sec * 1000 + t.tv_usec / 1000;
                    diff = now - start;
                    remaining -= diff;
 
                    if (diff < 0 || remaining <= 0) {
                       return 0;
                    }
                    start = now;
                }
            } else {
                return res;
            }
        }
    }

是不是可以认为,内核的poll函数应该是非阻塞的,实现阻塞和超时是PollArrayWrapper.c的for循环逻辑实现的。

比如selector线程正在阻塞中,对这个线程执行中断操作。植入的中断触发器执行选择器的wakeup操作。怎么唤醒呢?正常情况下,选择器监听的channel中,如果有感兴趣的事件发生,选择器便返回,那么也可以为wakeup构造这样一个场景。比如可以这样实现:在构造Selector时,私有的为Selector加入一个管道,正常情况下,这个管道既没有输入也没有输出,没有任何事件发生。但当selector阻塞时,并且被执行了中断,需要被唤醒,因为此时用户指定的channel是没有事件发生的。这个时候,私有的管道起作用了,可以往这个私有管道的写端输入一个字节,便触发了读感兴趣的事件,selector便被唤醒。

sun.nio.ch.PollSelectorImpl类中:
    PollSelectorImpl(SelectorProvider sp) {
        super(sp, 1, 1);
        int[] fdes = new int[2];
        IOUtil.initPipe(fdes, false);
        fd0 = fdes[0];
        fd1 = fdes[1];
        pollWrapper = new PollArrayWrapper(INIT_CAP);
        pollWrapper.initInterrupt(fd0, fd1);
        channelArray = new SelectionKeyImpl[INIT_CAP];
    }

IOUtil.initPipe,采用系统调用pipe(int fd[2])来创建管道,fd[0]即为ready端,fd[1]即为write端。

sun.nio.ch.PollArrayWrapper类中,构造函数为:
    PollArrayWrapper(int newSize) {
        newSize = (newSize + 1) * SIZE_POLLFD;
        pollArray = new AllocatedNativeObject(newSize, false);
        pollArrayAddress = pollArray.address();
        totalChannels = 1;
    }

INIT_CAP默认值 = 10,newSize + 1表明其中增加的一个是为用于wakeup的pollfd结构体分配内存。所以totalChannels = 1。SIZE_POLLFD = 8个字节,实际上pollfd结构体的长度(int fd + short events + short revents)。在调用内核的poll函数时,需要传入的pollfd结构体。在注释中,说明了pollfd的结构体信息
typedef struct pollfd {
    int fd;         //file descriptor  文件描述符
     short events;   //event of interest on fd 对fd感兴趣的事件
     short revents;  //event that occurred on fd 内核返回fd的事件
} pollfd_t;
内存分配是通过unsafe.allocateMemory实现的,AllocatedNativeObject(newSize, false)中false参数为pageAligned,应该是用于内存对齐,但真心没弄懂this.address = a + ps - (a & (ps - 1))
unsafe.allocateMemory返回的是内存起始地址,后面对PollArrayWrapper的pollfd数组操作都是通过起始地址+偏移量来实现。比如:
    int getEventOps(int i) {
        int offset = SIZE_POLLFD * i + EVENT_OFFSET;
        return pollArray.getShort(offset);
    }

计算第i个pollfd的events的偏移量,然后通过起始地址+偏移量来读取。实际上sun.nio.ch.AbstractPollArrayWrapper提供了操作pollfd的get和put方法。
SelectableChannel注册到Selector时,最后也是写入到PollArrayWrapper的pollfd数组,即调用PollArrayWrapper的addEntry方法:
    void addEntry(SelChImpl sc) {
        putDescriptor(totalChannels, IOUtil.fdVal(sc.getFD()));
        putEventOps(totalChannels, 0);
        putReventOps(totalChannels, 0);
        totalChannels++;
    }

可以看到是按照pollfd结构体来写入的。明白了这个概念,就明白了PollArrayWrapper的其他方法。
sun.nio.ch.PollArrayWrapper类中
    void initInterrupt(int fd0, int fd1) {
        interruptFD = fd1;
        putDescriptor(0, fd0);
        putEventOps(0, POLLIN);
        putReventOps(0, 0);
    }

1.这个私有的管道对POLLIN事件感兴趣。当执行wakeup时,往fd1写入一个字节,即可唤醒。在PollArrayWrapper.c中
Java_sun_nio_ch_PollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd){
    int fakebuf[1];
    fakebuf[0] = 1;
    if (write(fd, fakebuf, 1) < 0) {
         JNU_ThrowIOExceptionWithLastError(env, "Write to interrupt fd failed");
    }
}

2.put*方法最后都是通过unsafe类操作内存地址来构造;
sun.nio.ch.AbstractPollSelectorImpl
    protected int updateSelectedKeys() {
        int numKeysUpdated = 0;
        // Skip zeroth entry; it is for interrupts only
        for (int i=channelOffset; i<totalChannels; i++) {
            int rOps = pollWrapper.getReventOps(i);
            if (rOps != 0) {
                SelectionKeyImpl sk = channelArray[i];
                pollWrapper.putReventOps(i, 0);
                if (selectedKeys.contains(sk)) {
                    if (sk.channel.translateAndSetReadyOps(rOps, sk)) {
                        numKeysUpdated++;
                    }
                } else {
                    sk.channel.translateAndSetReadyOps(rOps, sk);
                    if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
                        selectedKeys.add(sk);
                        numKeysUpdated++;
                    }
                }
            }
        }
        return numKeysUpdated;
    }

1.Skip zeroth entry; it is for interrupts only:第0个是用于wakeup的管道,这个管道不关心内核的poll返回事件,需要跳过       
2.获取到内核返回的事件后,会将revents清为0;
3.如果sk.nioReadyOps() & sk.nioInterestOps()) != 0,即将SelectionKey加入到SelectorImpl的selectedKeys中;
4.translateAndSetReadyOps将内核返回的revents转化为SelectionKey的ready operation ops
相关标签: java sun nio