Java并发之条件阻塞Condition的应用代码示例
本文研究的主要是java并发之条件阻塞condition的应用示例代码,具体如下。
condition将object监视器方法(wait、notify 和 notifyall)分解成截然不同的对象,以便通过将这些对象与任意lock实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,lock 替代了synchronized方法和语句的使用,condition替代了object监视器方法的使用。
1. condition的基本使用
由于condition可以用来替代wait、notify等方法,所以可以对比着之前写过的的代码来看,再来看一下原来那个问题:
有两个线程,子线程先执行10次,然后主线程执行5次,然后再切换到子线程执行10,再主线程执行5次……如此往返执行50次。
之前用wait和notify来实现的,现在用condition来改写一下,代码如下:
public class conditioncommunication { public static void main(string[] args) { business bussiness = new business(); new thread(new runnable() { // 开启一个子线程 @override public void run() { for (int i = 1; i <= 50; i++) { bussiness.sub(i); } } } ).start(); // main方法主线程 for (int i = 1; i <= 50; i++) { bussiness.main(i); } } } class business { lock lock = new reentrantlock(); condition condition = lock.newcondition(); //condition是在具体的lock之上的 private boolean bshouldsub = true; public void sub(int i) { lock.lock(); try { while (!bshouldsub) { try { condition.await(); //用condition来调用await方法 } catch (exception e) { // todo auto-generated catch block e.printstacktrace(); } } for (int j = 1; j <= 10; j++) { system.out.println("sub thread sequence of " + j + ", loop of " + i); } bshouldsub = false; condition.signal(); //用condition来发出唤醒信号,唤醒某一个 } finally { lock.unlock(); } } public void main(int i) { lock.lock(); try { while (bshouldsub) { try { condition.await(); //用condition来调用await方法 } catch (exception e) { // todo auto-generated catch block e.printstacktrace(); } } for (int j = 1; j <= 10; j++) { system.out.println("main thread sequence of " + j + ", loop of " + i); } bshouldsub = true; condition.signal(); //用condition来发出唤醒信号么,唤醒某一个 } finally { lock.unlock(); } } }
从代码来看,condition的使用时和lock一起的,没有lock就没法使用condition,因为condition是通过lock来new出来的,这种用法很简单,只要掌握了synchronized和wait、notify的使用,完全可以掌握lock和condition的使用。
2. condition的拔高
2.1 缓冲区的阻塞队列
上面使用lock和condition来代替synchronized和object监视器方法实现了两个线程之间的通信,现在再来写个稍微高级点应用:模拟缓冲区的阻塞队列。
什么叫缓冲区呢?举个例子,现在有很多人要发消息,我是中转站,我要帮别人把消息发出去,那么现在我 就需要做两件事,一件事是接收用户发过来的消息,并按顺序放到缓冲区,另一件事是从缓冲区中按顺序取出用户发过来的消息,并发送出去。
现在把这个实际的问题抽象一下:缓冲区即一个数组,我们可以向数组中写入数据,也可以从数组中把数据取走,我要做的两件事就是开启两个线程,一个存数据,一个取数据。但是问题来了,如果缓冲区满了,说明接收的消息太多了,即发送过来的消息太快了,我另一个线程还来不及发完,导致现在缓冲区没地方放了,那么此时就得阻塞存数据这个线程,让其等待;相反,如果我转发的太快,现在缓冲区所有内容都被我发完了,还没有用户发新的消息来,那么此时就得阻塞取数据这个线程。
好了,分析完了这个缓冲区的阻塞队列,下面就用condition技术来实现一下:
class buffer { final lock lock = new reentrantlock(); //定义一个锁 final condition notfull = lock.newcondition(); //定义阻塞队列满了的condition final condition notempty = lock.newcondition(); //定义阻塞队列空了的condition final object[] items = new object[10]; //为了下面模拟,设置阻塞队列的大小为10,不要设太大 int putptr, takeptr, count; //数组下标,用来标定位置的 //往队列中存数据 public void put(object x) throws interruptedexception { lock.lock(); //上锁 try { while (count == items.length) { system.out.println(thread.currentthread().getname() + " 被阻塞了,暂时无法存数据!"); notfull.await(); //如果队列满了,那么阻塞存数据这个线程,等待被唤醒 } //如果没满,按顺序往数组中存 items[putptr] = x; if (++putptr == items.length) //这是到达数组末端的判断,如果到了,再回到始端 putptr = 0; ++count; //消息数量 system.out.println(thread.currentthread().getname() + " 存好了值: " + x); notempty.signal(); //好了,现在队列中有数据了,唤醒队列空的那个线程,可以取数据啦 } finally { lock.unlock(); //放锁 } } //从队列中取数据 public object take() throws interruptedexception { lock.lock(); //上锁 try { while (count == 0) { system.out.println(thread.currentthread().getname() + " 被阻塞了,暂时无法取数据!"); notempty.await(); //如果队列是空,那么阻塞取数据这个线程,等待被唤醒 } //如果没空,按顺序从数组中取 object x = items[takeptr]; if (++takeptr == items.length) //判断是否到达末端,如果到了,再回到始端 takeptr = 0; --count; //消息数量 system.out.println(thread.currentthread().getname() + " 取出了值: " + x); notfull.signal(); //好了,现在队列中有位置了,唤醒队列满的那个线程,可以存数据啦 return x; } finally { lock.unlock(); //放锁 } } }
这个程序很经典,我从官方jdk文档中拿出来的,然后加了注释。程序中定义了两个condition,分别针对两个线程,等待和唤醒分别用不同的condition来执行,思路很清晰,程序也很健壮。可以考虑一个问题,为啥要用两个codition呢?之所以这么设计肯定是有原因的,如果用一个condition,现在假设队列满了,但是有2个线程a和b同时存数据,那么都进入了睡眠,好,现在另一个线程取走一个了,然后唤醒了其中一个线程a,那么a可以存了,存完后,a又唤醒一个线程,如果b被唤醒了,那就出问题了,因为此时队列是满的,b不能存的,b存的话就会覆盖原来还没被取走的值,就因为使用了一个condition,存和取都用这个condition来睡眠和唤醒,就乱了套。到这里,就能体会到这个condition的用武之地了,现在来测试一下上面的阻塞队列的效果:
public class boundedbuffer { public static void main(string[] args) { buffer buffer = new buffer(); for (int i = 0; i < 5; i ++) { //开启5个线程往缓冲区存数据 new thread(new runnable() { @override public void run() { try { buffer.put(new random().nextint(1000)); //随机存数据 } catch (interruptedexception e) { e.printstacktrace(); } } } ).start(); } for (int i = 0; i < 10; i ++) { //开启10个线程从缓冲区中取数据 new thread(new runnable() { @override public void run() { try { buffer.take(); //从缓冲区取数据 } catch (interruptedexception e) { e.printstacktrace(); } } } ).start(); } } }
我故意只开启5个线程存数据,10个线程取数据,就是想让它出现取数据被阻塞的情况发生,看运行的结果:
thread-5 被阻塞了,暂时无法取数据!
thread-10 被阻塞了,暂时无法取数据!
thread-1 存好了值: 755
thread-0 存好了值: 206
thread-2 存好了值: 741
thread-3 存好了值: 381
thread-14 取出了值: 755
thread-4 存好了值: 783
thread-6 取出了值: 206
thread-7 取出了值: 741
thread-8 取出了值: 381
thread-9 取出了值: 783
thread-5 被阻塞了,暂时无法取数据!
thread-11 被阻塞了,暂时无法取数据!
thread-12 被阻塞了,暂时无法取数据!
thread-10 被阻塞了,暂时无法取数据!
thread-13 被阻塞了,暂时无法取数据!
从结果中可以看出,线程5和10抢先执行,发现队列中没有,于是就被阻塞了,睡在那了,直到队列中有新的值存入才可以取,但是它们两运气不好,存的数据又被其他线程给抢先取走了,哈哈……可以多运行几次。如果想要看到存数据被阻塞,可以将取数据的线程设置少一点,这里我就不设了。
2.2 两个以上线程之间的唤醒
还是原来那个题目,现在让三个线程来执行,看一下题目:
有三个线程,子线程1先执行10次,然后子线程2执行10次,然后主线程执行5次,然后再切换到子线程1执行10次,子线程2执行10次,主线程执行5次……如此往返执行50次。
如过不用condition,还真不好弄,但是用condition来做的话,就非常方便了,原理很简单,定义三个condition,子线程1执行完唤醒子线程2,子线程2执行完唤醒主线程,主线程执行完唤醒子线程1。唤醒机制和上面那个缓冲区道理差不多,下面看看代码吧,很容易理解。
public class threeconditioncommunication { public static void main(string[] args) { business bussiness = new business(); new thread(new runnable() { // 开启一个子线程 @override public void run() { for (int i = 1; i <= 50; i++) { bussiness.sub1(i); } } } ).start(); new thread(new runnable() { // 开启另一个子线程 @override public void run() { for (int i = 1; i <= 50; i++) { bussiness.sub2(i); } } } ).start(); // main方法主线程 for (int i = 1; i <= 50; i++) { bussiness.main(i); } } static class business { lock lock = new reentrantlock(); condition condition1 = lock.newcondition(); //condition是在具体的lock之上的 condition condition2 = lock.newcondition(); condition conditionmain = lock.newcondition(); private int bshouldsub = 0; public void sub1(int i) { lock.lock(); try { while (bshouldsub != 0) { try { condition1.await(); //用condition来调用await方法 } catch (exception e) { // todo auto-generated catch block e.printstacktrace(); } } for (int j = 1; j <= 10; j++) { system.out.println("sub1 thread sequence of " + j + ", loop of " + i); } bshouldsub = 1; condition2.signal(); //让线程2执行 } finally { lock.unlock(); } } public void sub2(int i) { lock.lock(); try { while (bshouldsub != 1) { try { condition2.await(); //用condition来调用await方法 } catch (exception e) { // todo auto-generated catch block e.printstacktrace(); } } for (int j = 1; j <= 10; j++) { system.out.println("sub2 thread sequence of " + j + ", loop of " + i); } bshouldsub = 2; conditionmain.signal(); //让主线程执行 } finally { lock.unlock(); } } public void main(int i) { lock.lock(); try { while (bshouldsub != 2) { try { conditionmain.await(); //用condition来调用await方法 } catch (exception e) { // todo auto-generated catch block e.printstacktrace(); } } for (int j = 1; j <= 5; j++) { system.out.println("main thread sequence of " + j + ", loop of " + i); } bshouldsub = 0; condition1.signal(); //让线程1执行 } finally { lock.unlock(); } } } }
代码看似有点长,但是是假象,逻辑非常简单。关于线程中的condition技术就总结这么多吧。
总结
以上就是本文关于java并发之条件阻塞condition的应用代码示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!