欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

java并发中锁的应用

程序员文章站 2022-07-10 10:46:02
...

锁的理解

锁产生于多线程并发应用,其作用是解决共享对象的同步同时也可以控制线程的行为。我认为锁不仅仅限于synchronize,ReentrantLock,ReadWriteLock.同时也包括CountDownLack, FutureTask, Semaphore, CyclicBarrier, Exchanger这些平时接触不多的并发控制类。后者经常会用在控制线程的运行行为。

 

1.

CountDownLack 这种锁经常用来控制多个线程同时启动,并且能够及时感知这些线程是否全部运行结束。举例如下:

例子中m_begin用来为那10个线程发送启动指令,当m_begin.countDown()时,10个线程同时启动。同时m_end.await()这些线程结束的好消息。

/**
 * @filename CountDownLaunchTest.java
 * @date     2014-11-14 
 */
package lock;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest
{
	public static void main(String[] args)
	{
		new CountDownLatchTest(10).runAll();
	}
	
	private int m_threadNum = 10;
	private CountDownLatch m_begin = null;
	private CountDownLatch m_end = null;
	
	CountDownLatchTest(int threadNum)
	{
		if(threadNum>0)
			m_threadNum = threadNum;
		m_begin = new CountDownLatch(1);
		m_end = new CountDownLatch(m_threadNum);
	}
	
	public void runAll()
	{
		for(int i=0; i<m_threadNum; i++)
		{
			new InnerThread(String.valueOf(i)).start();
		}
		
		m_begin.countDown();
		try
		{
			m_end.await();
		} catch (InterruptedException e)
		{
			e.printStackTrace();
		}
		
		System.out.println("all thread finished");
	}
	
	private class InnerThread extends Thread
	{
		private final String name;
		InnerThread(String threadName)
		{
			name = threadName;
			System.out.println(name+" inited");
		}
		@Override
		public void run()
		{
			try
			{
				//!!!!!注意是await,不是wait
				m_begin.await();
				Thread.currentThread().sleep(3000);
			} catch (InterruptedException e)
			{
				e.printStackTrace();
			}
			m_end.countDown();
			System.out.println(name+" finished");
		}
	}
}

  

2.FutureTask 这种锁可以获取一个线程任务的运行结果,也就是说我们有个任务需要启动一个线程进行处理,同时也需要得到这个线程的返回结果时,用这个锁比较好用。

/**
 * @filename FutureTaskTest.java
 * @date     2014-11-14 
 */
package lock;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskTest
{
	public static void main(String[] args)
	{
		new FutureTaskTest().test();
	}

	public void test()
	{
		FutureTask<Integer> futureTask = new FutureTask<Integer>(new InnerRunnable());
		futureTask.run();
		
		try
		{
			int result = futureTask.get();
			System.out.println(result);
		} catch (InterruptedException e)
		{
			e.printStackTrace();
		} catch (ExecutionException e)
		{
			//所有异常(除InterruptedException)均会封装成ExecutionException异常而抛出
			e.printStackTrace();
		}
	}
	
	private class InnerRunnable implements Callable<Integer>
	{
		public Integer call() throws Exception
		{
			try
			{
				Thread.sleep(3000);
//				throw new IllegalStateException();
			} catch (InterruptedException e)
			{
				e.printStackTrace();
				return -1;
			}
			return 0;
		}
	}
}

 

3.

Semaphore,信号量,其经常用在线程池数量的控制或者队列大小的控制上,根据预先设定好的数值,然后有线程来acquire()和release(),当申请的次数大于预设值时将阻塞,直到其他线程释放资源。

/**
 * @filename SemaphoreTest.java
 * @date     2014-11-14 
 */
package lock;

import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Semaphore;

public class SemaphoreTest
{
	
	public static void main(String[] args)
	{
		SemaphoreTest test = new SemaphoreTest();
		test.init(10);
		test.releaseNumToPool(1);
		test.getNumFromPool();
		test.releaseNumToPool(11);
	}
	
	//通常用在连接池里面,用来限制申请连接的数目。
	private Semaphore sem = new Semaphore(0);
	
	private final ConcurrentSkipListSet<Integer> m_numPool = new ConcurrentSkipListSet<Integer>();
	
	public void init(int size)
	{
		for(int i=1; i<=size; i++)
			releaseNumToPool(i);
	}
	
	public Integer getNumFromPool()
	{
		try
		{
			sem.acquire();
		} catch (InterruptedException e)
		{
			e.printStackTrace();
		};
		if(m_numPool.isEmpty())
			return 0;
		return m_numPool.first();
	}
	
	public void releaseNumToPool(Integer num)
	{
		if(!m_numPool.contains(num))
		{
			m_numPool.add(num);
			sem.release();
		}
	}
}

 

4.

CyclicBarrier, 与CountDownLack和FutureTask不通的是,这种锁可以循环的使用,其用来规范线程任务的运行后行为,也就是当设置此锁有,多个线程运行完后,均会等在此锁上,当最后一个线程运行到此锁时,大家才继续运行。这个锁目前我能想到应用场景的地方不多,或许可以用在某些需要拼装工作同时拼装的场景里面吧。

/**
 * @filename CyclicBarrierTest.java
 * @date     2014-11-14 
 */
package lock;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest
{
	public static void main(String[] args)
	{
		CyclicBarrierTest test = new CyclicBarrierTest(10);
		
		for(int i=0; i<5; i++)
			test.startOnce();
	}
	
	private CyclicBarrier m_barrier;
	private final Object lock = new Object();
	public CyclicBarrierTest(int barrierSize)
	{
		m_barrier = new CyclicBarrier(barrierSize, new Runnable()
		{
			public void run()
			{
				synchronized (lock)
				{
					lock.notify();
				}
				System.out.println("all thread run completed");
			}
		});
	}
	
	
	public void startOnce()
	{
		for(int i=1,size=m_barrier.getParties(); i<=size; i++)
		{
			InnerThread thread = new InnerThread(i*1000, String.valueOf(i));
			thread.start();
		}

		synchronized (lock)
		{
			try
			{
				lock.wait();
			} catch (InterruptedException e)
			{
				e.printStackTrace();
			}
		}
			
	}
	
	private class InnerThread extends Thread
	{
		private long m_waitTime = 0;
		private String m_name;
		
		public InnerThread(long waitTime, String threadName)
		{
			if(waitTime>0)
				m_waitTime = waitTime;
			m_name = threadName;
		}
		
		@Override
		public void run()
		{
			try
			{
				Thread.sleep(m_waitTime);
				System.out.println(m_name+" waited "+m_waitTime);
				m_barrier.await();
			} catch (InterruptedException e)
			{
				e.printStackTrace();
			} catch (BrokenBarrierException e)
			{
				e.printStackTrace();
			}
			System.out.println(m_name+" completed");
		}
	}
}

 5.

Exchanger,其与SynchronizeQueue有相似之处,SynchronizeQueue是单向的,Exchanger是双向的。也就是说当两个线程运行到Exchanger时,双方均会将自己的数据交换给对方。目前我也没有想到很好的应用。

/**
 * @filename ExchangerTest.java
 * @date     2014-11-14 
 */
package lock;

import java.util.concurrent.Exchanger;

public class ExchangerTest
{
	public static void main(String[] args)
	{
		new ExchangerTest().start();
	}
	
	private Exchanger<String> exchanger = new Exchanger<String>();
	
	public void start()
	{
		new cargoThread().start();
		new moneyThread().start();
	}
	
	private class cargoThread extends Thread
	{
		String message = "cargo";
		
		public void run()
		{
			
			System.out.println("cargo thread is producing cargo");
			try
			{
				Thread.sleep(5000);
			} catch (InterruptedException e1)
			{
				e1.printStackTrace();
			}
			
			System.out.println("cargo thread begin to wait exchange");
			try
			{
				message = exchanger.exchange(message);
			} catch (InterruptedException e)
			{
				e.printStackTrace();
			}
			System.out.println("cargo thread exchange message "+message);
		}
	}
	
	private class moneyThread extends Thread
	{
		String message = "money";
		
		public void run()
		{
			System.out.println("money thread begin to wait exchange");
			try
			{
				message = exchanger.exchange(message);
			} catch (InterruptedException e)
			{
				e.printStackTrace();
			}
			System.out.println("money thread exchange message "+message);
		}
	}
	
	
}