Java中的锁(五)
本章主要介绍Java中semaphore的使用和底层实现原理。
一、semaphore的基本概念
semaphore又叫信号量,在Java1.5中引入,是用来控制同时访问共享资源的线程数量,通过协调各个线程,以保证合理的使用资源。semaphore底层是通过AQS实现线程管理的,提供两个构造函数,实现公平和非公平共享信号量。
//非公平信号量,同时可以允许多少线程(许可)进行访问,和线程等待时间长久没有关系
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//公平共享信号量,即等待时间越久的越先获取许可
public Semaphore(int permits, boolean fair) {
sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}
使用示例:
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
//设置信号量同时执行的线程数是5
final Semaphore semp = new Semaphore(5);
for(int i =0; i<10;i++){
final int finalI = i;
Runnable task = () -> {
try {
semp.acquire();
System.out.println("Accessing: " + finalI);
Thread.sleep(1000);
} catch (InterruptedException e) {
} finally {
semp.release();
}
};
exec.submit(task);
}
exec.shutdown();
}
}
Semphore类中提供的基本方法:
public class Semaphore implements java.io.Serializable{
// 底层的Sync内部类,提供tryReleaseShared
abstract static class Sync extends AbstractQueuedSynchronizer...
static final class NonfairSync extends Sync...
static final class FairSync extends Sync...
// 获取一个信号量,可以被中断
public void acquire() throws InterruptedException;
// 获取一个中断信号,不能被中断
public void acquireUninterruptibly();
public boolean tryAcquire();
public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException
// 获取指定数量的信号量
public void acquire(int permits) throws InterruptedException
public void acquireUninterruptibly(int permits)
public boolean tryAcquire(int permits)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException
// 释放一个信号量
public void release()
public void release(int permits)
// 动态减小许可,当资源用完不能再用时,这时就可以减小许可证
protected void reducePermits(int reduction)
}
二、Semaphore的底层实现原理
2.1 非公平锁中的共享锁
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
//调用父类Sync的nonfairTryAcquireShared
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
其中,permits传入值后,设置为AQS的state。
Sync(int permits) {
setState(permits);
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
// AQS 的state
private volatile int state;
protected final int getState() {return state;}
protected final void setState(int newState) {state = newState;}
//对state变量进行CAS 操作
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
调用acquire()方法时,先判断state是否足够使用,如果state值代表的许可数足够使用,请求线程将会获得同步状态即对共享资源的访问权,并CAS更新state的值(一般是对state值减1);如果state值已为0,请求线程将无法获取同步状态,将被加入到同步队列并阻塞,直到其他线程释放同步状态才可能获取对共享资源的访问权。
// 中断性获取锁
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS中中断性获取锁
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//未获取成功加入同步队列等待
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
非常重要的一点,对于tryAcquire()方法,内部的实现直接在Semaphore中,不会加入等待队列
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
对于acquireUninterruptibly(int permits)方法,内部依托于AQS的同步获取方法:
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
// 依赖AQS的同步获取
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 先查看一下Semaphore的非公平获取方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
private void doAcquireShared(int arg) {
// 首先加入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 进入自旋操作
for (;;) {
final Node p = node.predecessor();
// 如果前置节点时head
if (p == head) {
//如果前置节点是head,本节点尝试获取锁,并设置本节点为head,返回获取到锁
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//调整同步队列中node结点的状态并判断是否应该被挂起
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在获取同步状态时,加入同步队列后,如果前置节点不是head,会执行shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前置节点等待被唤醒,需要执行后面的阻塞方法
if (ws == Node.SIGNAL)
return true;
// 如果前置节点已经执行完毕,遍历同步队列,知道找到一个需要被唤醒的节点作为前置节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前置节点状态小于0并且也不是SIGNAL,设置为SIGNAL状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
释放锁的方法release(),具体的实现有:
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
// AQS中释放的方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Semaphore中释放方法
protected final boolean tryReleaseShared(int releases) {
// 自旋的方式设置state
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 释放成功后,要唤醒后继节点
private void doReleaseShared() {
// 如果头节点的后继节点需要唤醒,那么执行唤醒动作;否则将头结点的等待状态设置为PROPAGATE保证
// 唤醒传递。
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果头节点对应的线程是SIGNAL状态,头节点获取到同步状态,
if (ws == Node.SIGNAL) {
// 修改头结点对应的线程状态设置为0。失败的话,则继续循环。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒头结点后继结点所对应的线程
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 如果头结点发生变化,则继续循环。否则,退出循环。
if (h == head)
break;
}
}
//唤醒传入结点的后继结点对应的线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//拿到后继结点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
2.2 公平锁中的共享锁
static final class FairSync extends Sync {
FairSync(int permits) {super(permits);}
protected int tryAcquireShared(int acquires) {
for (;;) {
// 这里于非公平锁的主要区别是:队列中是否有节点,如果有节点,直接加入同步等待队列
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
参考链接:https://blog.csdn.net/javazejian/article/details/76167357
上一篇: JS面向对象的程序设计相关知识小结
下一篇: 互相独立进程间共享内存互斥访问的解决办法