Java 高并发五:JDK并发包1详细介绍
在[高并发java 二] 多线程基础中,我们已经初步提到了基本的线程同步操作。这次要提到的是在并发包中的同步控制工具。
1. 各种同步控制工具的使用
1.1 reentrantlock
reentrantlock感觉上是synchronized的增强版,synchronized的特点是使用简单,一切交给jvm去处理,但是功能上是比较薄弱的。在jdk1.5之前,reentrantlock的性能要好于synchronized,由于对jvm进行了优化,现在的jdk版本中,两者性能是不相上下的。如果是简单的实现,不要刻意去使用reentrantlock。
相比于synchronized,reentrantlock在功能上更加丰富,它具有可重入、可中断、可限时、公平锁等特点。
首先我们通过一个例子来说明reentrantlock最初步的用法:
package test; import java.util.concurrent.locks.reentrantlock; public class test implements runnable { public static reentrantlock lock = new reentrantlock(); public static int i = 0; @override public void run() { for (int j = 0; j < 10000000; j++) { lock.lock(); try { i++; } finally { lock.unlock(); } } } public static void main(string[] args) throws interruptedexception { test test = new test(); thread t1 = new thread(test); thread t2 = new thread(test); t1.start(); t2.start(); t1.join(); t2.join(); system.out.println(i); } }
有两个线程都对i进行++操作,为了保证线程安全,使用了 reentrantlock,从用法上可以看出,与 synchronized相比, reentrantlock就稍微复杂一点。因为必须在finally中进行解锁操作,如果不在 finally解锁,有可能代码出现异常锁没被释放,而synchronized是由jvm来释放锁。
那么reentrantlock到底有哪些优秀的特点呢?
1.1.1 可重入
单线程可以重复进入,但要重复退出
lock.lock(); lock.lock(); try { i++; } finally { lock.unlock(); lock.unlock(); }
由于reentrantlock是重入锁,所以可以反复得到相同的一把锁,它有一个与锁相关的获取计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,然后锁需要被释放两次才能获得真正释放(重入锁)。这模仿了 synchronized 的语义;如果线程进入由线程已经拥有的监控器保护的 synchronized 块,就允许线程继续进行,当线程退出第二个(或者后续) synchronized 块的时候,不释放锁,只有线程退出它进入的监控器保护的第一个synchronized 块时,才释放锁。
public class child extends father implements runnable{ final static child child = new child();//为了保证锁唯一 public static void main(string[] args) { for (int i = 0; i < 50; i++) { new thread(child).start(); } } public synchronized void dosomething() { system.out.println("1child.dosomething()"); doanotherthing(); // 调用自己类中其他的synchronized方法 } private synchronized void doanotherthing() { super.dosomething(); // 调用父类的synchronized方法 system.out.println("3child.doanotherthing()"); } @override public void run() { child.dosomething(); } } class father { public synchronized void dosomething() { system.out.println("2father.dosomething()"); } }
我们可以看到一个线程进入不同的 synchronized方法,是不会释放之前得到的锁的。所以输出还是顺序输出。所以synchronized也是重入锁
输出:
1child.dosomething()
2father.dosomething()
3child.doanotherthing()
1child.dosomething()
2father.dosomething()
3child.doanotherthing()
1child.dosomething()
2father.dosomething()
3child.doanotherthing()
...
1.1.2.可中断
与synchronized不同的是,reentrantlock对中断是有响应的。中断相关知识查看[高并发java 二] 多线程基础
普通的lock.lock()是不能响应中断的,lock.lockinterruptibly()能够响应中断。
我们模拟出一个死锁现场,然后用中断来处理死锁
package test; import java.lang.management.managementfactory; import java.lang.management.threadinfo; import java.lang.management.threadmxbean; import java.util.concurrent.locks.reentrantlock; public class test implements runnable { public static reentrantlock lock1 = new reentrantlock(); public static reentrantlock lock2 = new reentrantlock(); int lock; public test(int lock) { this.lock = lock; } @override public void run() { try { if (lock == 1) { lock1.lockinterruptibly(); try { thread.sleep(500); } catch (exception e) { // todo: handle exception } lock2.lockinterruptibly(); } else { lock2.lockinterruptibly(); try { thread.sleep(500); } catch (exception e) { // todo: handle exception } lock1.lockinterruptibly(); } } catch (exception e) { // todo: handle exception } finally { if (lock1.isheldbycurrentthread()) { lock1.unlock(); } if (lock2.isheldbycurrentthread()) { lock2.unlock(); } system.out.println(thread.currentthread().getid() + ":线程退出"); } } public static void main(string[] args) throws interruptedexception { test t1 = new test(1); test t2 = new test(2); thread thread1 = new thread(t1); thread thread2 = new thread(t2); thread1.start(); thread2.start(); thread.sleep(1000); //deadlockchecker.check(); } static class deadlockchecker { private final static threadmxbean mbean = managementfactory .getthreadmxbean(); final static runnable deadlockchecker = new runnable() { @override public void run() { // todo auto-generated method stub while (true) { long[] deadlockedthreadids = mbean.finddeadlockedthreads(); if (deadlockedthreadids != null) { threadinfo[] threadinfos = mbean.getthreadinfo(deadlockedthreadids); for (thread t : thread.getallstacktraces().keyset()) { for (int i = 0; i < threadinfos.length; i++) { if(t.getid() == threadinfos[i].getthreadid()) { t.interrupt(); } } } } try { thread.sleep(5000); } catch (exception e) { // todo: handle exception } } } }; public static void check() { thread t = new thread(deadlockchecker); t.setdaemon(true); t.start(); } } }
上述代码有可能会发生死锁,线程1得到lock1,线程2得到lock2,然后彼此又想获得对方的锁。
我们用jstack查看运行上述代码后的情况
的确发现了一个死锁。
deadlockchecker.check();方法用来检测死锁,然后把死锁的线程中断。中断后,线程正常退出。
1.1.3.可限时
超时不能获得锁,就返回false,不会永久等待构成死锁
使用lock.trylock(long timeout, timeunit unit)来实现可限时锁,参数为时间和单位。
举个例子来说明下可限时:
package test; import java.util.concurrent.timeunit; import java.util.concurrent.locks.reentrantlock; public class test implements runnable { public static reentrantlock lock = new reentrantlock(); @override public void run() { try { if (lock.trylock(5, timeunit.seconds)) { thread.sleep(6000); } else { system.out.println("get lock failed"); } } catch (exception e) { } finally { if (lock.isheldbycurrentthread()) { lock.unlock(); } } } public static void main(string[] args) { test t = new test(); thread t1 = new thread(t); thread t2 = new thread(t); t1.start(); t2.start(); } }
使用两个线程来争夺一把锁,当某个线程获得锁后,sleep6秒,每个线程都只尝试5秒去获得锁。
所以必定有一个线程无法获得锁。无法获得后就直接退出了。
输出:
get lock failed
1.1.4.公平锁
使用方式:
public reentrantlock(boolean fair)
public static reentrantlock fairlock = new reentrantlock(true);
一般意义上的锁是不公平的,不一定先来的线程能先得到锁,后来的线程就后得到锁。不公平的锁可能会产生饥饿现象。
公平锁的意思就是,这个锁能保证线程是先来的先得到锁。虽然公平锁不会产生饥饿现象,但是公平锁的性能会比非公平锁差很多。
1.2 condition
condition与reentrantlock的关系就类似于synchronized与object.wait()/signal()
await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalall()方法时,线 程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和object.wait()方法很相似。
awaituninterruptibly()方法与await()方法基本相同,但是它并不会再等待过程中响应中断。 singal()方法用于唤醒一个在等待中的线程。相对的singalall()方法会唤醒所有在等待中的线程。这和obejct.notify()方法很类似。
这里就不再详细介绍了。举个例子来说明:
package test; import java.util.concurrent.locks.condition; import java.util.concurrent.locks.reentrantlock; public class test implements runnable { public static reentrantlock lock = new reentrantlock(); public static condition condition = lock.newcondition(); @override public void run() { try { lock.lock(); condition.await(); system.out.println("thread is going on"); } catch (exception e) { e.printstacktrace(); } finally { lock.unlock(); } } public static void main(string[] args) throws interruptedexception { test t = new test(); thread thread = new thread(t); thread.start(); thread.sleep(2000); lock.lock(); condition.signal(); lock.unlock(); } }
上述例子很简单,让一个线程await住,让主线程去唤醒它。condition.await()/signal只能在得到锁以后使用。
1.3.semaphore
对于锁来说,它是互斥的排他的。意思就是,只要我获得了锁,没人能再获得了。
而对于semaphore来说,它允许多个线程同时进入临界区。可以认为它是一个共享锁,但是共享的额度是有限制的,额度用完了,其他没有拿到额度的线程还是要阻塞在临界区外。当额度为1时,就相等于lock
下面举个例子:
package test; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.semaphore; public class test implements runnable { final semaphore semaphore = new semaphore(5); @override public void run() { try { semaphore.acquire(); thread.sleep(2000); system.out.println(thread.currentthread().getid() + " done"); } catch (exception e) { e.printstacktrace(); }finally { semaphore.release(); } } public static void main(string[] args) throws interruptedexception { executorservice executorservice = executors.newfixedthreadpool(20); final test t = new test(); for (int i = 0; i < 20; i++) { executorservice.submit(t); } } }
有一个20个线程的线程池,每个线程都去 semaphore的许可,semaphore的许可只有5个,运行后可以看到,5个一批,一批一批地输出。
当然一个线程也可以一次申请多个许可
public void acquire(int permits) throws interruptedexception
1.4 readwritelock
readwritelock是区分功能的锁。读和写是两种不同的功能,读-读不互斥,读-写互斥,写-写互斥。
这样的设计是并发量提高了,又保证了数据安全。
使用方式:
private static reentrantreadwritelock readwritelock=new reentrantreadwritelock();
private static lock readlock = readwritelock.readlock();
private static lock writelock = readwritelock.writelock();
详细例子可以查看 java实现生产者消费者问题与读者写者问题,这里就不展开了。
1.5 countdownlatch
倒数计时器
一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检查。 只有等所有检查完毕后,引擎才能点火。这种场景就非常适合使用countdownlatch。它可以使得点火线程
,等待所有检查线程全部完工后,再执行
使用方式:
static final countdownlatch end = new countdownlatch(10);
end.countdown();
end.await();
示意图:
一个简单的例子:
package test; import java.util.concurrent.countdownlatch; import java.util.concurrent.executorservice; import java.util.concurrent.executors; public class test implements runnable { static final countdownlatch countdownlatch = new countdownlatch(10); static final test t = new test(); @override public void run() { try { thread.sleep(2000); system.out.println("complete"); countdownlatch.countdown(); } catch (exception e) { e.printstacktrace(); } } public static void main(string[] args) throws interruptedexception { executorservice executorservice = executors.newfixedthreadpool(10); for (int i = 0; i < 10; i++) { executorservice.execute(t); } countdownlatch.await(); system.out.println("end"); executorservice.shutdown(); } }
主线程必须等待10个线程全部执行完才会输出"end"。
1.6 cyclicbarrier
和countdownlatch相似,也是等待某些线程都做完以后再执行。与countdownlatch区别在于这个计数器可以反复使用。比如,假设我们将计数器设置为10。那么凑齐第一批1 0个线程后,计数器就会归零,然后接着凑齐下一批10个线程
使用方式:
public cyclicbarrier(int parties, runnable barrieraction)
barrieraction就是当计数器一次计数完成后,系统会执行的动作
await()
示意图:
下面举个例子:
package test; import java.util.concurrent.cyclicbarrier; public class test implements runnable { private string soldier; private final cyclicbarrier cyclic; public test(string soldier, cyclicbarrier cyclic) { this.soldier = soldier; this.cyclic = cyclic; } @override public void run() { try { //等待所有士兵到齐 cyclic.await(); dowork(); //等待所有士兵完成工作 cyclic.await(); } catch (exception e) { // todo auto-generated catch block e.printstacktrace(); } } private void dowork() { // todo auto-generated method stub try { thread.sleep(3000); } catch (exception e) { // todo: handle exception } system.out.println(soldier + ": done"); } public static class barrierrun implements runnable { boolean flag; int n; public barrierrun(boolean flag, int n) { super(); this.flag = flag; this.n = n; } @override public void run() { if (flag) { system.out.println(n + "个任务完成"); } else { system.out.println(n + "个集合完成"); flag = true; } } } public static void main(string[] args) { final int n = 10; thread[] threads = new thread[n]; boolean flag = false; cyclicbarrier barrier = new cyclicbarrier(n, new barrierrun(flag, n)); system.out.println("集合"); for (int i = 0; i < n; i++) { system.out.println(i + "报道"); threads[i] = new thread(new test("士兵" + i, barrier)); threads[i].start(); } } }
打印结果:
集合
0报道
1报道
2报道
3报道
4报道
5报道
6报道
7报道
8报道
9报道
10个集合完成
士兵5: done
士兵7: done
士兵8: done
士兵3: done
士兵4: done
士兵1: done
士兵6: done
士兵2: done
士兵0: done
士兵9: done
10个任务完成
1.7 locksupport
提供线程阻塞原语
和suspend类似
locksupport.park();
locksupport.unpark(t1);
与suspend相比 不容易引起线程冻结
locksupport的思想呢,和 semaphore有点相似,内部有一个许可,park的时候拿掉这个许可,unpark的时候申请这个许可。所以如果unpark在park之前,是不会发生线程冻结的。
下面的代码是[高并发java 二] 多线程基础中suspend示例代码,在使用suspend时会发生死锁。
package test; import java.util.concurrent.locks.locksupport; public class test { static object u = new object(); static testsuspendthread t1 = new testsuspendthread("t1"); static testsuspendthread t2 = new testsuspendthread("t2"); public static class testsuspendthread extends thread { public testsuspendthread(string name) { setname(name); } @override public void run() { synchronized (u) { system.out.println("in " + getname()); //thread.currentthread().suspend(); locksupport.park(); } } } public static void main(string[] args) throws interruptedexception { t1.start(); thread.sleep(100); t2.start(); // t1.resume(); // t2.resume(); locksupport.unpark(t1); locksupport.unpark(t2); t1.join(); t2.join(); } }
而使用 locksupport则不会发生死锁。
另外
park()能够响应中断,但不抛出异常。中断响应的结果是,park()函数的返回,可以从thread.interrupted()得到中断标志。
在jdk当中有大量地方使用到了park,当然locksupport的实现也是使用unsafe.park()来实现的。
public static void park() {
unsafe.park(false, 0l);
}
1.8 reentrantlock 的实现
下面来介绍下reentrantlock的实现,reentrantlock的实现主要由3部分组成:
- cas状态
- 等待队列
- park()
reentrantlock的父类中会有一个state变量来表示同步的状态
/** * the synchronization state. */ private volatile int state;
通过cas操作来设置state来获取锁,如果设置成了1,则将锁的持有者给当前线程
final void lock() { if (compareandsetstate(0, 1)) setexclusiveownerthread(thread.currentthread()); else acquire(1); }
如果拿锁不成功,则会做一个申请
public final void acquire(int arg) { if (!tryacquire(arg) && acquirequeued(addwaiter(node.exclusive), arg)) selfinterrupt(); }
首先,再去申请下试试看tryacquire,因为此时可能另一个线程已经释放了锁。
如果还是没有申请到锁,就addwaiter,意思是把自己加到等待队列中去
private node addwaiter(node mode) { node node = new node(thread.currentthread(), mode); // try the fast path of enq; backup to full enq on failure node pred = tail; if (pred != null) { node.prev = pred; if (compareandsettail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
其间还会有多次尝试去申请锁,如果还是申请不到,就会被挂起
private final boolean parkandcheckinterrupt() { locksupport.park(this); return thread.interrupted(); }
同理,如果在unlock操作中,就是释放了锁,然后unpark,这里就不具体讲了。
2. 并发容器及典型源码分析
2.1 concurrenthashmap
我们知道hashmap不是一个线程安全的容器,最简单的方式使hashmap变成线程安全就是使用
collections.synchronizedmap,它是对hashmap的一个包装
public static map m=collections.synchronizedmap(new hashmap());
同理对于list,set也提供了相似方法。
但是这种方式只适合于并发量比较小的情况。
我们来看下synchronizedmap的实现
private final map<k,v> m; // backing map final object mutex; // object on which to synchronize synchronizedmap(map<k,v> m) { if (m==null) throw new nullpointerexception(); this.m = m; mutex = this; } synchronizedmap(map<k,v> m, object mutex) { this.m = m; this.mutex = mutex; } public int size() { synchronized (mutex) {return m.size();} } public boolean isempty() { synchronized (mutex) {return m.isempty();} } public boolean containskey(object key) { synchronized (mutex) {return m.containskey(key);} } public boolean containsvalue(object value) { synchronized (mutex) {return m.containsvalue(value);} } public v get(object key) { synchronized (mutex) {return m.get(key);} } public v put(k key, v value) { synchronized (mutex) {return m.put(key, value);} } public v remove(object key) { synchronized (mutex) {return m.remove(key);} } public void putall(map<? extends k, ? extends v> map) { synchronized (mutex) {m.putall(map);} } public void clear() { synchronized (mutex) {m.clear();} }
它会将hashmap包装在里面,然后将hashmap的每个操作都加上synchronized。
由于每个方法都是获取同一把锁(mutex),这就意味着,put和remove等操作是互斥的,大大减少了并发量。
下面来看下concurrenthashmap是如何实现的
public v put(k key, v value) { segment<k,v> s; if (value == null) throw new nullpointerexception(); int hash = hash(key); int j = (hash >>> segmentshift) & segmentmask; if ((s = (segment<k,v>)unsafe.getobject // nonvolatile; recheck (segments, (j << sshift) + sbase)) == null) // in ensuresegment s = ensuresegment(j); return s.put(key, hash, value, false); }
在 concurrenthashmap内部有一个segment段,它将大的hashmap切分成若干个段(小的hashmap),然后让数据在每一段上hash,这样多个线程在不同段上的hash操作一定是线程安全的,所以只需要同步同一个段上的线程就可以了,这样实现了锁的分离,大大增加了并发量。
在使用concurrenthashmap.size时会比较麻烦,因为它要统计每个段的数据和,在这个时候,要把每一个段都加上锁,然后再做数据统计。这个就是把锁分离后的小小弊端,但是size方法应该是不会被高频率调用的方法。
在实现上,不使用synchronized和lock.lock而是尽量使用trylock,同时在hashmap的实现上,也做了一点优化。这里就不提了。
2.2 blockingqueue
blockingqueue不是一个高性能的容器。但是它是一个非常好的共享数据的容器。是典型的生产者和消费者的实现。
示意图:
具体可以查看java实现生产者消费者问题与读者写者问题
下一篇: 有意思的java程序片段