Java并发编程Semaphore计数信号量详解
semaphore 是一个计数信号量,它的本质是一个共享锁。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量)。
简单示例:
package me.socketthread; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.semaphore; public class semaphorelearn { //信号量总数 private static final int sem_max = 12; public static void main(string[] args) { semaphore sem = new semaphore(sem_max); //创建线程池 executorservice threadpool = executors.newfixedthreadpool(3); //在线程池中执行任务 threadpool.execute(new mythread(sem, 7)); threadpool.execute(new mythread(sem, 4)); threadpool.execute(new mythread(sem, 2)); //关闭池 threadpool.shutdown(); } } class mythread extends thread { private volatile semaphore sem; // 信号量 private int count; // 申请信号量的大小 mythread(semaphore sem, int count) { this.sem = sem; this.count = count; } public void run() { try { // 从信号量中获取count个许可 sem.acquire(count); thread.sleep(2000); system.out.println(thread.currentthread().getname() + " acquire count="+count); } catch (interruptedexception e) { e.printstacktrace(); } finally { // 释放给定数目的许可,将其返回到信号量。 sem.release(count); system.out.println(thread.currentthread().getname() + " release " + count + ""); } } }
执行结果:
pool-1-thread-2 acquire count=4 pool-1-thread-1 acquire count=7 pool-1-thread-1 release 7 pool-1-thread-2 release 4 pool-1-thread-3 acquire count=2 pool-1-thread-3 release 2
线程1和线程2会并发执行,因为两者的信号量和没有超过总信号量,当前两个线程释放掉信号量之后线程3才能继续执行。
源码分析:
1、构造函数
在构造函数中会初始化信号量值,这值最终是作为锁标志位state的值
semaphore sem = new semaphore(12);//简单来说就是给锁标识位state赋值为12
2、semaphore.acquire(n);简单理解为获取锁资源,如果获取不到线程阻塞
semaphore.acquire(n);//从锁标识位state中获取n个信号量,简单来说是state = state-n 此时state大于0表示可以获取信号量,如果小于0则将线程阻塞
public void acquire(int permits) throws interruptedexception { if (permits < 0) throw new illegalargumentexception(); //获取锁 sync.acquiresharedinterruptibly(permits); }
acquiresharedinterruptibly中的操作是获取锁资源,如果可以获取则将state= state-permits,否则将线程阻塞
public final void acquiresharedinterruptibly(int arg) throws interruptedexception { if (thread.interrupted()) throw new interruptedexception(); if (tryacquireshared(arg) < 0)//tryacquireshared中尝试获取锁资源 doacquiresharedinterruptibly(arg); //将线程阻塞 }
tryacquireshared中的操作是尝试获取信号量值,简单来说就是state=state-acquires ,如果此时小于0则返回负值,否则返回大于新值,再判断是否将当线程线程阻塞
protected int tryacquireshared(int acquires) { for (;;) { if (hasqueuedpredecessors()) return -1; //获取state值 int available = getstate(); //从state中获取信号量 int remaining = available - acquires; if (remaining < 0 || compareandsetstate(available, remaining)) //如果信号量小于0则直接返回,表示无法获取信号量,否则将state值修改为新值 return remaining; } }
doacquiresharedinterruptibly中的操作简单来说是将当前线程添加到fifo队列中并将当前线程阻塞。
/会将线程添加到fifo队列中,并阻塞 private void doacquiresharedinterruptibly(int arg) throws interruptedexception { //将线程添加到fifo队列中 final node node = addwaiter(node.shared); boolean failed = true; try { for (;;) { final node p = node.predecessor(); if (p == head) { int r = tryacquireshared(arg); if (r >= 0) { setheadandpropagate(node, r); p.next = null; // help gc failed = false; return; } } //parkandcheckinterrupt完成线程的阻塞操作 if (shouldparkafterfailedacquire(p, node) && parkandcheckinterrupt()) throw new interruptedexception(); } } finally { if (failed) cancelacquire(node); } }
3、semaphore.release(int permits),这个函数的实现操作是将state = state+permits并唤起处于fifo队列中的阻塞线程。
public void release(int permits) { if (permits < 0) throw new illegalargumentexception(); //state = state+permits,并将fifo队列中的阻塞线程唤起 sync.releaseshared(permits); }
releaseshared中的操作是将state = state+permits,并将fifo队列中的阻塞线程唤起。
public final boolean releaseshared(int arg) { //tryreleaseshared将state设置为state = state+arg if (tryreleaseshared(arg)) { //唤起fifo队列中的阻塞线程 doreleaseshared(); return true; } return false; }
tryreleaseshared将state设置为state = state+arg
protected final boolean tryreleaseshared(int releases) { for (;;) { int current = getstate(); int next = current + releases; if (next < current) // overflow throw new error("maximum permit count exceeded"); //将state值设置为state=state+releases if (compareandsetstate(current, next)) return true; } }
doreleaseshared()唤起fifo队列中的阻塞线程
private void doreleaseshared() { for (;;) { node h = head; if (h != null && h != tail) { int ws = h.waitstatus; if (ws == node.signal) { if (!compareandsetwaitstatus(h, node.signal, 0)) continue; // loop to recheck cases //完成阻塞线程的唤起操作 unparksuccessor(h); } else if (ws == 0 && !compareandsetwaitstatus(h, 0, node.propagate)) continue; // loop on failed cas } if (h == head) // loop if head changed break; } }
总结:semaphore简单来说设置了一个信号量池state,当线程执行时会从state中获取值,如果可以获取则线程执行,并且在执行后将获取的资源返回到信号量池中,并唤起其他阻塞线程;如果信号量池中的资源无法满足某个线程的需求则将此线程阻塞。
semaphore源码:
public class semaphore implements java.io.serializable { private static final long serialversionuid = -3222578661600680210l; private final sync sync; abstract static class sync extends abstractqueuedsynchronizer { private static final long serialversionuid = 1192457210091910933l; //设置锁标识位state的初始值 sync(int permits) { setstate(permits); } //获取锁标识位state的值,如果state值大于其需要的值则表示锁可以获取 final int getpermits() { return getstate(); } //获取state值减去acquires后的值,如果大于等于0则表示锁可以获取 final int nonfairtryacquireshared(int acquires) { for (;;) { int available = getstate(); int remaining = available - acquires; if (remaining < 0 || compareandsetstate(available, remaining)) return remaining; } } //释放锁 protected final boolean tryreleaseshared(int releases) { for (;;) { int current = getstate(); //将state值加上release值 int next = current + releases; if (next < current) // overflow throw new error("maximum permit count exceeded"); if (compareandsetstate(current, next)) return true; } } //将state的值减去reductions final void reducepermits(int reductions) { for (;;) { int current = getstate(); int next = current - reductions; if (next > current) // underflow throw new error("permit count underflow"); if (compareandsetstate(current, next)) return; } } final int drainpermits() { for (;;) { int current = getstate(); if (current == 0 || compareandsetstate(current, 0)) return current; } } } //非公平锁 static final class nonfairsync extends sync { private static final long serialversionuid = -2694183684443567898l; nonfairsync(int permits) { super(permits); } protected int tryacquireshared(int acquires) { return nonfairtryacquireshared(acquires); } } //公平锁 static final class fairsync extends sync { private static final long serialversionuid = 2014338818796000944l; fairsync(int permits) { super(permits); } protected int tryacquireshared(int acquires) { for (;;) { if (hasqueuedpredecessors()) return -1; int available = getstate(); int remaining = available - acquires; if (remaining < 0 || compareandsetstate(available, remaining)) return remaining; } } } //设置信号量 public semaphore(int permits) { sync = new nonfairsync(permits); } public semaphore(int permits, boolean fair) { sync = fair ? new fairsync(permits) : new nonfairsync(permits); } //获取锁 public void acquire() throws interruptedexception { sync.acquiresharedinterruptibly(1); } public void acquireuninterruptibly() { sync.acquireshared(1); } public boolean tryacquire() { return sync.nonfairtryacquireshared(1) >= 0; } public boolean tryacquire(long timeout, timeunit unit) throws interruptedexception { return sync.tryacquiresharednanos(1, unit.tonanos(timeout)); } public void release() { sync.releaseshared(1); } //获取permits值锁 public void acquire(int permits) throws interruptedexception { if (permits < 0) throw new illegalargumentexception(); sync.acquiresharedinterruptibly(permits); } public void acquireuninterruptibly(int permits) { if (permits < 0) throw new illegalargumentexception(); sync.acquireshared(permits); } public boolean tryacquire(int permits) { if (permits < 0) throw new illegalargumentexception(); return sync.nonfairtryacquireshared(permits) >= 0; } public boolean tryacquire(int permits, long timeout, timeunit unit) throws interruptedexception { if (permits < 0) throw new illegalargumentexception(); return sync.tryacquiresharednanos(permits, unit.tonanos(timeout)); } //释放 public void release(int permits) { if (permits < 0) throw new illegalargumentexception(); sync.releaseshared(permits); } public int availablepermits() { return sync.getpermits(); } public int drainpermits() { return sync.drainpermits(); } protected void reducepermits(int reduction) { if (reduction < 0) throw new illegalargumentexception(); sync.reducepermits(reduction); } public boolean isfair() { return sync instanceof fairsync; } public final boolean hasqueuedthreads() { return sync.hasqueuedthreads(); } public final int getqueuelength() { return sync.getqueuelength(); } protected collection<thread> getqueuedthreads() { return sync.getqueuedthreads(); } public string tostring() { return super.tostring() + "[permits = " + sync.getpermits() + "]"; } }
总结
以上就是本文关于java并发编程semaphore计数信号量详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:java并发编程之重入锁与读写锁、java系统的高并发解决方法详解、等,有什么问题,可以留言交流讨论。感谢朋友们对本站的支持!