Java中锁的分类及其一些有限的认知
锁分类:
1.公平锁和非公平锁
公平锁是指多个线程按照申请锁的顺序来获取锁。比如ReentrantLock的内部同时实现了公平锁和非公平锁。提到ReentrantLock就不得不提到AQS,它同时也是CountDownLatch、Semaphore、FutureTask的基础。
非公平锁就是指后来的线程也有机会获得锁。其实Synchronized也属于非公平锁。
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. (提供一个框架,用于实现依赖于先见先出队列的阻塞锁与同步器(信号量 事件 等等));This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic value to represent state. (对应:private volatile int state) and which define what that state means in terms of this object being acquired or released(共享与独占状态)。Given these, the other methods in this class carry out all queuing and blocking mechanics.(给定(有人说翻译成考虑到)这些,这个类的其余方法进行/执行所有的排列和阻塞机制)。but only the atomically updated {@code int} value manipulated using methods {@link #getState}, {@link #setState} and {@link #compareAndSetState} is tracked(跟踪 ) with respect to(关于) synchronization.(但为了获得同步而只追踪使用三个方式来操作以原子方式更新的int值 state)。
protected final void setState(int newState) {
state = newState;
}
---扩展知识:关于final的
final基本类型的话基本类型不可变 state不可变 引用 不能指向新引用 但引用里面的内容可变 final String编译器会优化 因为会被视为常量 常量是直接存储字面值而不是引用的。
String s0 = "ab";
final String s1 = getS1();
String s2 = "a" + s1;
System.out.println((s0 == s2)); //result = false
private static String getS1() { return "b"; }
分析:JVM对于字符串引用s1,它的值在编译期无法确定,只有在程序运行期调用方法后,将方法的返回值和"a"来动态连接并分配地址为s2,故上面 程序的结果为false。
string+="hello"的操作事实上会自动被JVM优化成:
StringBuilder str = new StringBuilder(string);
str.append("hello");
str.toString();
因为String的不可变性,所以引入了StringBuffer,又因为线程安全性引入了StringBuilder.
练习题:
String a1 = "helloworld";
String a2 = "hello" + "world";
System.err.println(a1 == a2);//result = true
String s0 = "ab";
final String s1 = getS1();
String s2 = "a" + s1;
System.out.println((s0 == s2)); //result = false
String ss = "a" + "b" + "c";
String ss1 = "a";
String ss2 = "b";
String ss3 = "c";
String ss4 = ss1 + ss2 + ss3;
System.err.println(ss == ss4);//result = false
String sss = "a" + "b" + "c";
final String sss1 = "a";
final String sss2 = "b";
final String sss3 = "c";
final String sss4 = sss1 + sss2 + sss3;
System.err.println(sss == sss4);//result = true
-----扩展小知识:关于intern的;
JDK1.7后,intern方法还是会先去查询常量池中是否有已经存在,如果存在,则返回常量池中的引用,这一点与之前没有区别,区别在于,如果在常量池找不到对应的字符串,则不会再将字符串拷贝到常量池,而只是在常量池中生成一个对原字符串的引用。简单的说,就是往常量池放的东西变了:原来在常量池中找不到时,复制一个副本放到常量池,1.7后则是将在堆上的地址引用复制到常量池。
举例说明:
1 String str2 = new String("str")+new String("01");
2 str2.intern();
3 String str1 = "str01";
4 System.out.println(str2==str1);
在JDK 1.7下,当执行str2.intern();时,因为常量池中没有“str01”这个字符串,所以会在常量池中生成一个对堆中的“str01”的引用(注意这里是引用 ,就是这个区别于JDK 1.6的地方。在JDK1.6下是生成原字符串的拷贝),而在进行String str1 = “str01”;字面量赋值的时候,常量池中已经存在一个引用,所以直接返回了该引用,因此str1和str2都指向堆中的同一个字符串,返回true。
将中间23两行调换位置以后,因为在进行字面量赋值(String str1 = “str01″)的时候,常量池中不存在,所以str1指向的常量池中的位置,而str2指向的是堆中的对象,再进行intern方法时,对str1和str2已经没有影响了,所以返回false。
This class supports either or both a default <em>exclusive</em> mode and a <em>shared</em> mode.(支持排他模式(默认)或者共享模式)。 When acquired in exclusive mode, attempted acquires by other threads cannot succeed. Shared mode acquires by multiple(多个) threads may (but need not) succeed. This class does not understand these differences except in the mechanical sense(机械的意识) that when a shared mode acquire succeeds, the next waiting thread (if one exists) must also determine(确定) whether it can acquire as well. Threads waiting in the different modes share the same FIFO queue. Usually, implementation subclasses support only one of these modes, but both can come into play(积极的,起作用的) for example in a {@link ReadWriteLock}.Subclasses that support only exclusive or only shared modes need not define the methods supporting the unused mode.
isHeldExclusively()
方法将报告同步对于当前线程是否是独占的;使用当前 getState()
值调用 release(int)
方法则可以完全释放此对象;如果给定保存的状态值,那么 acquire(int)
方法可以将此对象最终恢复为它以前获取的状态。
You may also find the inherited(继承) methods from {@link AbstractOwnableSynchronizer} useful to keep track of(记录/跟踪) the thread owning an exclusive synchronizer. You are encouraged to use them-- this enables monitoring(监视) and diagnostic(诊断) tools to assist(帮助) users in determining(确定) which threads hold locks.
Even though this class is based on an internal FIFO queue, it does not automatically enforce(强制) FIFO acquisition policies. The core of exclusive synchronization takes the form(采取如下形式---):
Acquire:
while (!tryAcquire(arg)) {
//获取不到自动入列
<em>enqueue thread if it is not already queued</em>;
<em>possibly block current thread</em>;
}
Release:
if (tryRelease(arg))
//释放 解锁
<em>unblock the first queued thread</em>;
(Shared mode is similar but may involve cascading signals.) 共享模式相似 但可能涉及到级联信号量
Because checks in acquire are invoked before enqueuing(因为在入列前需要调用检查线程状态), a newly acquiring thread may <em>barge(闯入) ahead of others that are blocked and queued. However, you can, if desired(希望 渴求),define {@code tryAcquire} and/or {@code tryAcquireShared} to disable barging by internally invoking one or more of the inspection methods, thereby(以此方式) providing a <em>fair</em> FIFO acquisition order.
In particular(特别是), most fair synchronizers can define {@code tryAcquire} to return {@code false} if {@link #hasQueuedPredecessors} (a method specifically designed to be used by fair synchronizers) returns {@code true}. Other variations(变化) are possible.
Here is a non-reentrant mutual exclusion(互相排斥) lock class that uses the value zero to represent the unlocked state(使用0来代表未锁定状态), and one to represent the locked state(1代表锁定状态). While a non-reentrant lock does not strictly(严肃的) require recording of the current owner thread, this class does so anyway to make usage easier(用法) to monitor.It also supports conditions and exposes one of the instrumentation(使用方法) methods:
class Mutex(互斥) implements Lock(类互斥实现锁)
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Report whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquire the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Release the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provide a Condition
Condition newCondition() { return new ConditionObject(); }
// Deserialize properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
以下是一个锁存器类,它类似于 CountDownLatch
,除了只需要触发单个 signal 之外。因为锁存器是非独占的,所以它使用 shared 的获取和释放方法。
class BooleanLatch {
private static class Sync extends AbstractQueuedSynchronizer {
boolean isSignalled() { return getState() != 0; }
protected int tryAcquireShared(int ignore) {
return isSignalled()? 1 : -1;
}
protected boolean tryReleaseShared(int ignore) {
setState(1);
return true;
}
}
private final Sync sync = new Sync();
public boolean isSignalled() { return sync.isSignalled(); }
public void signal() { sync.releaseShared(1); }
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}
--------扩展知识之CLH锁
CLH锁是一个自旋锁。能确保无饥饿性。提供先来先服务的公平性。
A "status" field in each node keeps track of whether a thread should block.(每个街道中的status字段用于跟踪一个线程是否应该阻塞)。
To enqueue into a CLH lock, you atomically splice it in as new tail. To dequeue, you just set the head field.()
CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。
当一个线程需要获取锁时:
a.创建一个的QNode,将其中的locked设置为true表示需要获取锁
b.线程对tail域调用getAndSet方法,使自己成为队列的尾部,同时获取一个指向其前趋结点的引用myPred
c.该线程就在前趋结点的locked字段上旋转,直到前趋结点释放锁
d.当一个线程需要释放锁时,将当前结点的locked域设置为false,同时回收前趋结点
public class CLHLock implements Lock {
AtomicReference<QNode> tail = new AtomicReference<QNode>(new QNode());
ThreadLocal<QNode> myPred;
ThreadLocal<QNode> myNode;
//实例化这个Lock,初始化MyNode与myPred(分别置为新结点和null);
public CLHLock() {
tail = new AtomicReference<QNode>(new QNode());
myNode = new ThreadLocal<QNode>() {
protected QNode initialValue() {
return new QNode();
}
};
myPred = new ThreadLocal<QNode>() {
protected QNode initialValue() {
return null;
}
};
}
//加锁(获取当前结点,当前结点设置为为尾节点,获取前置结点 前置结点判断状态 要不要结束循环获取锁)
@Override
public void lock() {
QNode qnode = myNode.get();
qnode.locked = true;
QNode pred = tail.getAndSet(qnode);
myPred.set(pred);
-----------------------------------------------------------------------------------附加 Begin
(ThreadLocal.set(T value)-----);
ThreadLocal的应用:
下面的原子类里面的getandset方法 会设置新值然后会返回老值。
/**
* Atomically sets to the given value and returns the old value.
*
* @param newValue the new value
* @return the previous value
*/
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
-----------------------------------------------------------------------------------附加 End
while (pred.locked) {
}
}
//获取当前节点设置状态 后面的结点获取锁 同时回收前结点
@Override
public void unlock() {
QNode qnode = myNode.get();
qnode.locked = false;
myNode.set(myPred.get());
}
}
---------
所以aqs的内部实现是通过一个FIFO的队列实现的,队列的实现基础是基于一个存储了waitStatus(volatile)的双向循环Node,只有为signal的时候才能挂起(被动阻塞)。
aqs内部定义了volatile的state和head(指向链表的头节点)与tail结点(竞争失败的线程是通过尾插法插入双向循环列表的,也就是tail一直指向尾节点)。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
用tryacquire获取锁,不成功的话进入队列(addWaiter方法)。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
新建一个持有当前线程与模式的Node,然后把tail赋给一个新建的节点pred,这里tail为null的话说明未初始化,去初始化,设置head与tail都指向new node;而当tail不为null的时候,node的前驱节点设置成pred(tail的临时值),尝试把tail设置成尾节点,成功的话把pred的next指向node,这样就完成了链表的设置成功入列,返回当前node。不成功的话则继续自旋,直到成功为止。当自旋失败一定次数之后就进入了enq的方法里面。enq就也是自旋,除了多了一步如果尾节点为null,就进行初始化。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
而成功入列了之后,就要执行下面的从队列里面获取锁的操作了。finally保证了如果不成功的话最终是会释放锁的。然后也是自旋,判断当前节点的前驱结点是不是head结点(这里体现了head结点的作用),是的话把如果获取到锁,就把当前结点设置成头节点 原本的前驱结点p.next 设置成null,不去执行 cancelAcquire方法,返回false。
出队操作
只要设置新的head
结点就可以了。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
* 不间断的获取已经在排列队伍中的线程使用等待方法来控制以及获取。
* acquireQueued返回true,说明当前线程被中断唤醒后获取到锁,
* 重置其interrupt status为true。
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
---------
// p != head 或者 p == head但是tryAcquire失败了,那么 // 应该阻塞当前线程等待前继唤醒。阻塞之前会再重试一次,还需要设置前继的waitStaus为SIGNAL。
shouldParkAfterFailedAcquire方法的作用是:
- 确定后继是否需要park(挂起和阻塞的意义相近);
- 跳过被取消的结点(node.prev=pred=pred.prev);
- 设置前继的waitStatus为SIGNAL(compareAndSetWaitStatus).
其中waitStatus的几种状态分别表示如下: CANCELLED(1) 取消状态 SIGNAL(-1) 等待触发状态 CONDITION(-2) 等待条件状态PROPAGATE(-3) 状态需要向后传播
等待队列是FIFO先进先出,只有前一个节点的状态为SIGNAL时,当前节点的线程才能被挂起。
检查原则在于:
- 规则1:如果前继的节点状态为SIGNAL,表明当前节点需要park(LockSupport的park和unpark的基本使用),则返回成功,此时acquireQueued方法的第12行(parkAndCheckInterrupt)将导致线程阻塞
- 规则2:如果前继节点状态为CANCELLED(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,返回false,acquireQueued方法的无限循环将递归调用该方法,直至规则1返回true,导致线程阻塞
- 规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,返回false后进入acquireQueued的无限循环,与规则2同
总体看来,shouldParkAfterFailedAcquire就是靠前继节点判断当前线程是否应该被阻塞,如果前继节点处于CANCELLED状态,则顺便删除这些节点重新构造队列。
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
---
park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。3.伪唤醒 所以需要return Thread.interrupted()来确定到底是不是被中断了,如果确实被中断了的话,需要调用
/**
* Convenience method to interrupt current thread.
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
来把中断补上。
(需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。
/**
* Convenience method to interrupt current thread.
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
Thread.interrupt()方法不会中断一个正在运行的线程。这一方法实际上完成的是,在线程受到阻塞时抛出一个中断信号,这样线程就得以退出阻塞的状态。更确切的说,如果线程被Object.wait, Thread.join和Thread.sleep三种方法之一阻塞,那么,它将接收到一个中断异常(InterruptedException),从而提早地终结被阻塞状态。
LockSupport.park()
也能响应中断信号,但是跟Thread.sleep()
不同的是它不会抛出InterruptedException
, 那怎么知道线程是被unpark还是被中断的呢,这就依赖线程的interrupted status,如果线程是被中断退出阻塞的那么该值被设置为true, 通过Thread的interrupted
和isInterrupted
方法都能获取该值, return Thread.interrupted(); (这里实现了第一时间获取当前线程是否被中断)两个方法的区别是interrupted
获取后会Clear,也就是将interrupted status重新置为false。
-------扩展小知识:
import java.util.concurrent.locks.LockSupport; /** * LockSupport很类似于二元信号量(只有1个许可证可供使用),如果这个许可还没有被占用,当前线程获取许可并继续执行;如果许可已经被占用,当前线程阻塞,等待获取许可。 * 因为许可默认是被占用的,调用park()时获取不到许可,所以进入阻塞状态。 * 调用线程的interrupt方法,并不能真正中断线程,只是给线程做了中断状态的标志 * Thread.interrupted():测试当前线程是否处于中断状态。执行后将中断状态标志为false * Thread.isInterrupte(): 测试线程Thread对象是否已经处于中断状态。但不具有清除功能 */ public class ParkTest { public static void main(String[] args) { // test1(); test2(); } static void test1() { LockSupport.park(); System.err.println("1"); } static void test2() { LockSupport.unpark(Thread.currentThread()); LockSupport.park(); Thread.interrupted(); System.err.println(Thread.currentThread().isInterrupted()); System.err.println(Thread.interrupted()); } }
上一篇: 代理模式——红酒经销
下一篇: 代理模式 Java设计模式笔记