java并发中锁的应用
锁的理解
锁产生于多线程并发应用,其作用是解决共享对象的同步同时也可以控制线程的行为。我认为锁不仅仅限于synchronize,ReentrantLock,ReadWriteLock.同时也包括CountDownLack, FutureTask, Semaphore, CyclicBarrier, Exchanger这些平时接触不多的并发控制类。后者经常会用在控制线程的运行行为。
1.
CountDownLack 这种锁经常用来控制多个线程同时启动,并且能够及时感知这些线程是否全部运行结束。举例如下:
例子中m_begin用来为那10个线程发送启动指令,当m_begin.countDown()时,10个线程同时启动。同时m_end.await()这些线程结束的好消息。
/** * @filename CountDownLaunchTest.java * @date 2014-11-14 */ package lock; import java.util.concurrent.CountDownLatch; public class CountDownLatchTest { public static void main(String[] args) { new CountDownLatchTest(10).runAll(); } private int m_threadNum = 10; private CountDownLatch m_begin = null; private CountDownLatch m_end = null; CountDownLatchTest(int threadNum) { if(threadNum>0) m_threadNum = threadNum; m_begin = new CountDownLatch(1); m_end = new CountDownLatch(m_threadNum); } public void runAll() { for(int i=0; i<m_threadNum; i++) { new InnerThread(String.valueOf(i)).start(); } m_begin.countDown(); try { m_end.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("all thread finished"); } private class InnerThread extends Thread { private final String name; InnerThread(String threadName) { name = threadName; System.out.println(name+" inited"); } @Override public void run() { try { //!!!!!注意是await,不是wait m_begin.await(); Thread.currentThread().sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } m_end.countDown(); System.out.println(name+" finished"); } } }
2.FutureTask 这种锁可以获取一个线程任务的运行结果,也就是说我们有个任务需要启动一个线程进行处理,同时也需要得到这个线程的返回结果时,用这个锁比较好用。
/** * @filename FutureTaskTest.java * @date 2014-11-14 */ package lock; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class FutureTaskTest { public static void main(String[] args) { new FutureTaskTest().test(); } public void test() { FutureTask<Integer> futureTask = new FutureTask<Integer>(new InnerRunnable()); futureTask.run(); try { int result = futureTask.get(); System.out.println(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { //所有异常(除InterruptedException)均会封装成ExecutionException异常而抛出 e.printStackTrace(); } } private class InnerRunnable implements Callable<Integer> { public Integer call() throws Exception { try { Thread.sleep(3000); // throw new IllegalStateException(); } catch (InterruptedException e) { e.printStackTrace(); return -1; } return 0; } } }
3.
Semaphore,信号量,其经常用在线程池数量的控制或者队列大小的控制上,根据预先设定好的数值,然后有线程来acquire()和release(),当申请的次数大于预设值时将阻塞,直到其他线程释放资源。
/** * @filename SemaphoreTest.java * @date 2014-11-14 */ package lock; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Semaphore; public class SemaphoreTest { public static void main(String[] args) { SemaphoreTest test = new SemaphoreTest(); test.init(10); test.releaseNumToPool(1); test.getNumFromPool(); test.releaseNumToPool(11); } //通常用在连接池里面,用来限制申请连接的数目。 private Semaphore sem = new Semaphore(0); private final ConcurrentSkipListSet<Integer> m_numPool = new ConcurrentSkipListSet<Integer>(); public void init(int size) { for(int i=1; i<=size; i++) releaseNumToPool(i); } public Integer getNumFromPool() { try { sem.acquire(); } catch (InterruptedException e) { e.printStackTrace(); }; if(m_numPool.isEmpty()) return 0; return m_numPool.first(); } public void releaseNumToPool(Integer num) { if(!m_numPool.contains(num)) { m_numPool.add(num); sem.release(); } } }
4.
CyclicBarrier, 与CountDownLack和FutureTask不通的是,这种锁可以循环的使用,其用来规范线程任务的运行后行为,也就是当设置此锁有,多个线程运行完后,均会等在此锁上,当最后一个线程运行到此锁时,大家才继续运行。这个锁目前我能想到应用场景的地方不多,或许可以用在某些需要拼装工作同时拼装的场景里面吧。
/** * @filename CyclicBarrierTest.java * @date 2014-11-14 */ package lock; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest { public static void main(String[] args) { CyclicBarrierTest test = new CyclicBarrierTest(10); for(int i=0; i<5; i++) test.startOnce(); } private CyclicBarrier m_barrier; private final Object lock = new Object(); public CyclicBarrierTest(int barrierSize) { m_barrier = new CyclicBarrier(barrierSize, new Runnable() { public void run() { synchronized (lock) { lock.notify(); } System.out.println("all thread run completed"); } }); } public void startOnce() { for(int i=1,size=m_barrier.getParties(); i<=size; i++) { InnerThread thread = new InnerThread(i*1000, String.valueOf(i)); thread.start(); } synchronized (lock) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } private class InnerThread extends Thread { private long m_waitTime = 0; private String m_name; public InnerThread(long waitTime, String threadName) { if(waitTime>0) m_waitTime = waitTime; m_name = threadName; } @Override public void run() { try { Thread.sleep(m_waitTime); System.out.println(m_name+" waited "+m_waitTime); m_barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(m_name+" completed"); } } }
5.
Exchanger,其与SynchronizeQueue有相似之处,SynchronizeQueue是单向的,Exchanger是双向的。也就是说当两个线程运行到Exchanger时,双方均会将自己的数据交换给对方。目前我也没有想到很好的应用。
/** * @filename ExchangerTest.java * @date 2014-11-14 */ package lock; import java.util.concurrent.Exchanger; public class ExchangerTest { public static void main(String[] args) { new ExchangerTest().start(); } private Exchanger<String> exchanger = new Exchanger<String>(); public void start() { new cargoThread().start(); new moneyThread().start(); } private class cargoThread extends Thread { String message = "cargo"; public void run() { System.out.println("cargo thread is producing cargo"); try { Thread.sleep(5000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("cargo thread begin to wait exchange"); try { message = exchanger.exchange(message); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("cargo thread exchange message "+message); } } private class moneyThread extends Thread { String message = "money"; public void run() { System.out.println("money thread begin to wait exchange"); try { message = exchanger.exchange(message); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("money thread exchange message "+message); } } }