Java线程同步方法实例总结
本文实例讲述了java线程同步方法。分享给大家供大家参考,具体如下:
1. semaphore
1.1 二进制semaphore
semaphore算是比较高级点的线程同步工具了,在许多其他语言里也有类似的实现。semaphore有一个最大的好处就是在初始化时,可以显式的控制并发数。其内部维护这一个c计数器,当计数器小于等于0时,是不允许其他线程访问并发区域的,反之则可以,因此,若将并发数设置为1,则可以确保单一线程同步。下面的例子模拟多线程打印,每个线程提交打印申请,然后执行打印,最后宣布打印结束,代码如下:
import java.util.concurrent.semaphore; public class program{ public static void main(string[] agrs){ printqueue p=new printqueue(); thread[] ths=new thread[10]; for(int i=0;i<10;i++){ ths[i]=new thread(new job(p),"thread"+i); } for(int i=0;i<10;i++){ ths[i].start(); } } } class printqueue{ private semaphore s; public printqueue(){ s=new semaphore(1);//二进制信号量 } public void printjob(object document){ try{ s.acquire(); long duration=(long)(math.random()*100); system.out.printf("线程名:%s 睡眠:%d",thread.currentthread().getname(),duration); thread.sleep(duration); } catch(interruptedexception e){ e.printstacktrace(); } finally{ s.release(); } } } class job implements runnable{ private printqueue p; public job(printqueue p){ this.p=p; } @override public void run(){ system.out.printf("%s:正在打印一个任务\n ",thread.currentthread().getname()); this.p.printjob(new object()); system.out.printf("%s:文件已打印完毕\n ",thread.currentthread().getname()); } }
执行结果如下:
thread0:正在打印一个任务
thread9:正在打印一个任务
thread8:正在打印一个任务
thread7:正在打印一个任务
thread6:正在打印一个任务
thread5:正在打印一个任务
thread4:正在打印一个任务
thread3:正在打印一个任务
thread2:正在打印一个任务
thread1:正在打印一个任务
线程名:thread0 睡眠:32 thread0:文件已打印完毕
线程名:thread9 睡眠:44 thread9:文件已打印完毕
线程名:thread8 睡眠:45 thread8:文件已打印完毕
线程名:thread7 睡眠:65 thread7:文件已打印完毕
线程名:thread6 睡眠:12 thread6:文件已打印完毕
线程名:thread5 睡眠:72 thread5:文件已打印完毕
线程名:thread4 睡眠:98 thread4:文件已打印完毕
线程名:thread3 睡眠:58 thread3:文件已打印完毕
线程名:thread2 睡眠:24 thread2:文件已打印完毕
线程名:thread1 睡眠:93 thread1:文件已打印完毕
可以看到,所有线程提交打印申请后,按照并发顺序一次执行,没有任何并发冲突,谁先获得信号量,谁就先执行,其他剩余线程均等待。这里面还有一个公平信号与非公平信号之说:基本上java所有的多线程工具都支持初始化的时候指定一个布尔变量,true时表明公平,即所有处于等待的线程被筛选的条件为“谁等的时间长就选谁进行执行”,有点first in first out的感觉,而false时则表明不公平(默认是不non-fairness),即所有处于等待的线程被筛选执行是随机的。这也就是为什么多线程往往执行顺序比较混乱的原因。
1.2 多重并发控制
若将上面的代码改为s=new semaphore(3);//即让其每次可以并发3条线程
,则输出如下:
thread0:正在打印一个任务
thread9:正在打印一个任务
thread8:正在打印一个任务
thread7:正在打印一个任务
thread6:正在打印一个任务
thread5:正在打印一个任务
thread3:正在打印一个任务
thread4:正在打印一个任务
thread2:正在打印一个任务
thread1:正在打印一个任务
线程名:thread9 睡眠:26线程名:thread8 睡眠:46线程名:thread0 睡眠:79 thread9:文件已打印完毕
线程名:thread7 睡眠:35 thread8:文件已打印完毕
线程名:thread6 睡眠:90 thread7:文件已打印完毕
线程名:thread5 睡眠:40 thread0:文件已打印完毕
线程名:thread3 睡眠:84 thread5:文件已打印完毕
线程名:thread4 睡眠:13 thread4:文件已打印完毕
线程名:thread2 睡眠:77 thread6:文件已打印完毕
线程名:thread1 睡眠:12 thread1:文件已打印完毕
thread3:文件已打印完毕
thread2:文件已打印完毕
很明显已经并发冲突了。若要实现分组(每组3个)并发吗,则每一组也要进行同步,代码修改如下:
import java.util.concurrent.semaphore; import java.util.concurrent.locks.lock; import java.util.concurrent.locks.reentrantlock; public class program{ public static void main(string[] agrs){ printqueue p=new printqueue(); thread[] ths=new thread[10]; for(int i=0;i<10;i++){ ths[i]=new thread(new job(p),"thread"+i); } for(int i=0;i<10;i++){ ths[i].start(); } } } class printqueue{ private semaphore s; private boolean[] freeprinters; private lock lock; public printqueue(){ s=new semaphore(3);//二进制信号量 freeprinters=new boolean[3]; for(int i=0;i<3;i++){ freeprinters[i]=true; } lock=new reentrantlock(); } public void printjob(object document){ try{ s.acquire(); int printerindex=getindex(); long duration=(long)(math.random()*100); system.out.printf("线程名:%s 睡眠:%d\n",thread.currentthread().getname(),duration); thread.sleep(duration); freeprinters[printerindex]=true;//恢复信号,供下次使用 } catch(interruptedexception e){ e.printstacktrace(); } finally{ s.release(); } } //返回一个内部分组后的同步索引 public int getindex(){ int index=-1; try{ lock.lock(); for(int i=0;i<freeprinters.length;i++){ if(freeprinters[i]){ freeprinters[i]=false; index=i; break; } } } catch(exception e){ e.printstacktrace(); } finally{ lock.unlock(); } return index; } } class job implements runnable{ private printqueue p; public job(printqueue p){ this.p=p; } @override public void run(){ system.out.printf("%s:正在打印一个任务\n ",thread.currentthread().getname()); this.p.printjob(new object()); system.out.printf(" %s:文件已打印完毕\n ",thread.currentthread().getname()); } }
其中getindex()
方法主要为了维护内部分组后(支持并发3个)组内数据的同步(用lock来同步)。
输出如下:
thread0:正在打印一个任务
thread9:正在打印一个任务
thread8:正在打印一个任务
thread7:正在打印一个任务
thread6:正在打印一个任务
thread5:正在打印一个任务
thread4:正在打印一个任务
thread3:正在打印一个任务
thread2:正在打印一个任务
thread1:正在打印一个任务
线程名:thread0 睡眠:82 打印机:0号
线程名:thread8 睡眠:61 打印机:2号
线程名:thread9 睡眠:19 打印机:1号
thread9:文件已打印完毕
线程名:thread7 睡眠:82 打印机:1号
thread8:文件已打印完毕
线程名:thread6 睡眠:26 打印机:2号
thread0:文件已打印完毕
线程名:thread5 睡眠:31 打印机:0号
thread6:文件已打印完毕
线程名:thread4 睡眠:44 打印机:2号
thread7:文件已打印完毕
线程名:thread3 睡眠:54 打印机:1号
thread5:文件已打印完毕
线程名:thread2 睡眠:48 打印机:0号
thread4:文件已打印完毕
线程名:thread1 睡眠:34 打印机:2号
thread3:文件已打印完毕
thread2:文件已打印完毕
thread1:文件已打印完毕
2. countdownlatch
countdownlatch同样也是支持多任务并发的一个工具。它主要用于“等待多个并发事件”,它内部也有一个计数器,当调用await()
方法时,线程处于等待状态,只有当内部计数器为0时才继续(countdown()
方法来减少计数),也就说,假若有一个需求是这样的:主线程等待所有子线程都到达某一条件时才执行,那么只需要主线程await,然后在启动每个子线程的时候进行countdown操作。下面模拟了一个开会的例子,只有当所有人员都到齐了,会议才能开始。
import java.util.concurrent.countdownlatch; public class program{ public static void main(string[] agrs){ //开启可容纳10人的会议室 videoconference v=new videoconference(10); new thread(v).start(); //参与人员陆续进场 for(int i=0;i<10;i++){ participant p=new participant(i+"号人员",v); new thread(p).start(); } } } class videoconference implements runnable{ private countdownlatch controller; public videoconference(int num){ controller=new countdownlatch(num); } public void arrive(string name){ system.out.printf("%s 已经到达!\n",name); controller.countdown(); system.out.printf("还需要等 %d 个成员!\n",controller.getcount()); } @override public void run(){ try{ system.out.printf("会议正在初始化...!\n"); controller.await(); system.out.printf("所有人都到齐了,开会吧!\n"); } catch(interruptedexception e){ e.printstacktrace(); } } } class participant implements runnable{ private videoconference conference; private string name; public participant(string name,videoconference conference){ this.name=name; this.conference=conference; } @override public void run(){ long duration=(long)(math.random()*100); try{ thread.sleep(duration); conference.arrive(this.name); } catch(interruptedexception e){ } } }
输出:
会议正在初始化...!
0号人员 已经到达!
还需要等 9 个成员!
1号人员 已经到达!
还需要等 8 个成员!
9号人员 已经到达!
还需要等 7 个成员!
4号人员 已经到达!
还需要等 6 个成员!
8号人员 已经到达!
还需要等 5 个成员!
5号人员 已经到达!
还需要等 4 个成员!
6号人员 已经到达!
还需要等 3 个成员!
3号人员 已经到达!
还需要等 2 个成员!
7号人员 已经到达!
还需要等 1 个成员!
2号人员 已经到达!
还需要等 0 个成员!
所有人都到齐了,开会吧!
3. phaser
import java.util.concurrent.phaser; import java.util.concurrent.timeunit; import java.util.list; import java.util.arraylist; import java.io.file; import java.util.date; public class program{ public static void main(string[] agrs){ phaser phaser=new phaser(3); filesearch system=new filesearch("c:\\windows", "log",phaser); filesearch apps=new filesearch("c:\\program files","log",phaser); filesearch documents=new filesearch("c:\\documents and settings","log",phaser); thread systemthread=new thread(system,"system"); systemthread.start(); thread appsthread=new thread(apps,"apps"); appsthread.start(); thread documentsthread=new thread(documents, "documents"); documentsthread.start(); try { systemthread.join(); appsthread.join(); documentsthread.join(); } catch (interruptedexception e) { e.printstacktrace(); } system.out.println("terminated: "+ phaser.isterminated()); } } class filesearch implements runnable{ private string initpath; private string end; private list<string> results; private phaser phaser; public filesearch(string initpath,string end,phaser phaser){ this.initpath=initpath; this.end=end; this.results=new arraylist<string>(); this.phaser=phaser; } private void directoryprocess(file file){ file[] files=file.listfiles(); if(files!=null){ for(int i=0;i<files.length;i++){ if(files[i].isdirectory()){ directoryprocess(files[i]); } else{ fileprocess(files[i]); } } } } private void fileprocess(file file){ if(file.getname().endswith(end)){ results.add(file.getabsolutepath()); } } private void filterresults(){ list<string> newresults=new arraylist<string>(); long actualdate=new date().gettime(); for(int i=0;i<results.size();i++){ file file=new file(results.get(i)); long filedate=file.lastmodified(); if(actualdate-filedate<timeunit.milliseconds.convert(1,timeunit.days)){ newresults.add(results.get(i)); } } results=newresults; } private boolean checkresults(){ if(results.isempty()){ system.out.printf("%s: phase %d: 0 results.\n",thread.currentthread().getname(),phaser.getphase()); system.out.printf("%s: phase %d: end.\n",thread.currentthread().getname(),phaser.getphase()); phaser.arriveandderegister(); } else{ system.out.printf("%s: phase %d: %d results.\n",thread.currentthread().getname(),phaser.getphase(),results.size()); phaser.arriveandawaitadvance(); return true; } } private void showinfo() { for (int i=0; i<results.size(); i++){ file file=new file(results.get(i)); system.out.printf("%s: %s\n",thread.currentthread().getname(),file.getabsolutepath()); } phaser.arriveandawaitadvance(); } @override public void run(){ file file=new file(initpath); if(file.isdirectory()){ directoryprocess(file); } if(!checkresults()){ return; } filterresults(); if(!checkresults()){ return; } showinfo(); phaser.arriveandderegister(); system.out.printf("%s: work completed.\n",thread.currentthread().getname()); } }
运行结果:
apps: phase 0: 4 results.
system: phase 0: 27 results.
更多java相关内容感兴趣的读者可查看本站专题:《java进程与线程操作技巧总结》、《java数据结构与算法教程》、《java操作dom节点技巧总结》、《java文件与目录操作技巧汇总》和《java缓存操作技巧汇总》
希望本文所述对大家java程序设计有所帮助。