欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

juc - Semaphore源码解读

程序员文章站 2022-07-12 20:10:34
...

Semaphore,翻译叫做信号灯,是用来做资源访问限制的,他维持了一个准许指令的集合,如果当前没有可以指令的话,调用一次acquire就会将当前的线程阻塞,没调用一次release就会将当前线程持有的指令还回指令集合。他的内部实现跟CountDownLatch类似,也是直接使用的aqs,在创建的时候就会设定一个state标记,用来表示可以同时被使用准许指令的最大值,在某个线程调用acquire的时候先判断这个值大于0,如果是的话表示当前可以获得一个准许,否则说明当前的指令已经全部用完,当前的线程被挂起,进入aqs的队列中。当某个已经获得指令的线程调用release的时候要把获取的指令返还,也就是将state标记增大。大体了解了意思之后,还是看看他的源码吧:

1、构造方法:

public Semaphore(int permits, boolean fair) {//第一个参数表示可以同时被获取的指令的个数,第二个参数表示在获取指令的时候是否是公平的,这个和ReentrantLock的公平的参数是一样的,如果是公平的,则任何获取操作都会先判断当前的aqs的队列中是否有等待的线程,如果有则排在最后面,如果是不公平的,则直接获取
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);//无论是fair还是不fair的都是aqs的子类,permits最终会被设置为state标记的大小。这里我以非公平的aqs为例。
    }

 2、获取锁的方法,这个是会抛异常的,即如果线程被挂起,其他线程调用了这个线程的interrupt方法,就会抛InterruptedException异常。

 public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);//调用aqs的获取方法,这个方法都是直接调用的aqs的方法,先判断是否可以获取,即tryAcquireShared方法,如果返回正数表示可以获取锁,否则排队等待。我们看一下非公平的aqs的代码
    }

 

 final int nonfairTryAcquireShared(int acquires) {//要获取的指令的数量
            for (;;) {
                int available = getState();//当前的标记
                int remaining = available - acquires;//最终返回的就是这个值,如果要获取的值小于state,则表示获得指令,否则没有获得指令。
                if (remaining < 0 || compareAndSetState(available, remaining))//如果小于0的话则直接返回,表示获取失败,第二个compareXXX是cas更新state的值。
                    return remaining;
            }
        }

 经过前面的博客的介绍,尤其是ReentrantLock和CountDownLatch,如果返回的是小于0的,就会加入到队列中,这里就不再重复贴代码了。返回的正负值的判断是根据当前的state的值和acquire的大小判断的。

 

3、获取所得方法,这个不会抛异常,

public void acquireUninterruptibly() {
    sync.acquireShared(1);//不抛异常的阻塞
}

 4、一次获取多个指令的方法

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);//他的思路和上面还是一样的,只不过将state减小的值更大一些,不再是1.
}

 acquire的方法基本原理都是一样的,在获取不到的时候就会排队,这个类没有限定时间的排队等待。

 

5、尝试性的获取指令,有好多个,都是tryAcquire开头的,有带有超时时间的,有的带有要获取的指令的个数,他的思路和ReentrantLock中的aqs是一样的,这里只写一个

public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;//判断当前的state标记的值是否大于1,如果是则获得指令并返回true,否则不获取指令也不排队,直接返回false。nonfairTryAcquireShared方法在上面介绍过了,
}

6、释放锁的方法;release方法,有的是释放一个,有的是多个指令。最终调用的是aqs的如下方法:

protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();//当前的state
                int next = current + releases;//
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))//cas设置state,这个方法只会返回true,不会返回false。
                    return true;
            }
        }
从这个方法来看,是可以release的指令的数量多余获取的指令的数量,在方法里面并没有判断的地方。

 在成功的返回true只会,这个方法还会调用doReleaseShared方法,很容易就能明白,他是为了唤醒因为获取不到指令而park的线程,这个队列是shared,即一下唤醒所有的阻塞的线程,但是由于state的大小的限制,很多的线程由于没有获取指令又一次被park了,这一点和CountDownLatch的思路一样。

7、获得当前剩余的指令的数量:

 public int availablePermits() {
        return sync.getPermits();//里面就是调用的获取state的方法,查看state的值。
    }

 

8、将当前的指令全部取消

public int drainPermits() {
    return sync.drainPermits();//drain有使...干涸的意思,即将当前剩余的所有的指令全部取消。
 }

 sync的drainPermits方法:

final int drainPermits() {
   for (;;) {
       int current = getState();//当前的state
       if (current == 0 || compareAndSetState(current, 0))//如果当前的state是0,直接返回,如果不是0,则通过cas设置为0,这样就没有可以获取的指令的了
           return current;
   }
}

 通过这个方法以及上面的release方法,可以发现Semaphore并没有严格的限制获取的指令必须归还,比如release中就可以归还大量的指令,也可以通过上面的drainPermtis将所有的指令全部取消,也就是这个指令的数量是可以动态改变的。

 

就这么简单,又学了一个juc的常用类。

 

相关标签: Semaphore