线程的并发工具类
fork-join(分而治之)
规模为n的问题,n<阈值,直接解决,n>阈值,将n分解为k个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解
如何使用的流程图
用法
1.fork/join的同步用法同时演示返回结果值:统计整形数组中所有元素的和
两个main方法,sumsinglethread类里的main是单线程求和,每次休眠一秒;sumbyforkjoin类里使用forkandjoin进行求和
下面是个生成随机整数数组的类
import java.util.random; /** * 产生一个整形的数组 * */ public class createarray { public static final int array_lenth=1000; public static int[] createarray(){ random r = new random(); int[] result = new int[array_lenth]; for(int i=0;i<array_lenth;i++){ result[i]=r.nextint(array_lenth*3); } return result; } }
import com.thread.demo.sleeptools; public class sumsinglethread { public static void main(string[] args) { int count = 0; int[] src =createarray.createarray(); long start = system.currenttimemillis(); for(int i= 0;i<src.length;i++){ sleeptools.ms(1); count = count + src[i]; } system.out.println("the count is "+count +" spend time:"+(system.currenttimemillis()-start)+"ms"); } }
public class sumbyforkjoin { private static class sumtask extends recursivetask<integer>{ private static final int threshold = createarray.array_lenth/10; private int[] src; //表示我们要实际统计的数组 private int fromindex;//开始统计的下标 private int toindex;//统计到哪里结束的下标 public sumtask(int[] src, int fromindex, int toindex) { this.src = src; this.fromindex = fromindex; this.toindex = toindex; } /** * 这个是有返回值的,在compute方法中按照需要的逻辑写forkjoin逻辑 * */ @override protected integer compute() { //当满足阈值范围时,进入计算 if(toindex-fromindex<threshold){ int count = 0; for(int i=fromindex;i<toindex;i++){ count=count+src[i]; } return count; }else{//不满足阈值时,继续拆分 int mid = (fromindex+toindex)/2; sumtask left = new sumtask(src, fromindex, mid); sumtask right = new sumtask(src, mid+1, toindex); invokeall(left, right); return left.join()+right.join(); } } } public static void main(string[] args) { forkjoinpool pool = new forkjoinpool(); int[] src = createarray.createarray(); sumtask innerfind = new sumtask(src,0,src.length-1); long start = system.currenttimemillis(); pool.invoke(innerfind);//同步调用,就是这个方法执行完才会继续执行下面的sysout,所以以这个demo是同步的用法,异步调用的方法:execute(object) system.out.println("task is running....."); system.out.println("the count is "+innerfind.join() +" spend time:"+(system.currenttimemillis()-start)+"ms"); } }
q:把循环求和中的sleep注掉,并且增大数组的长度,会发现,在小于一定长度时,单线程直接求和的速度比使用fork/jion快
a:因为使用forkjoin时cpu会进行上下问切换操作,这个操作相比较于计算型操作其实更费时间
2.fork/join的异步用法同时演示不要求返回值:遍历指定目录(含子目录)寻找指定类型文件
import java.io.file; import java.util.arraylist; import java.util.list; import java.util.concurrent.forkjoinpool; import java.util.concurrent.recursiveaction; public class findfile extends recursiveaction{ private file path; public findfile(file path) { this.path=path; } @override protected void compute() { list<findfile> subtasks = new arraylist<findfile>(); file[] files = path.listfiles(); if(files!=null){ for(file file:files){//循环文件路径 if(file.isdirectory()){//判断是不是目录 subtasks.add(new findfile(file)); }else{ if(file.getabsolutepath().endswith("avi")){ system.out.println("找到对应文件:"+file.getabsolutepath()); } } } if(!subtasks.isempty()){ for(findfile sub:invokeall(subtasks)){//invokeall的返回值和传入的值一样 sub.join(); } } } } public static void main(string[] args) { try { forkjoinpool pool = new forkjoinpool(); findfile task = new findfile(new file("d:/")); pool.execute(task);//异步调用 system.out.println("task is running......"); thread.sleep(1); int otherwork = 0; for(int i=0;i<100;i++){ otherwork = otherwork+i; } system.err.println("main thread done sth......,otherwork="+otherwork); task.join();//阻塞的方法,此处是为了防止出现主线程走完,task被直接中断的情况 system.out.println("task end"); } catch (interruptedexception e) { // todo auto-generated catch block e.printstacktrace(); } } }
常用的并发工具类(直接放课程里的demo了,很详细了~)
countdownlatch
作用:是一组线程等待其他的线程完成工作以后在执行,加强版join
await用来等待,countdown负责计数器的减一
import java.util.concurrent.countdownlatch; import com.xiangxue.tools.sleeptools; /** *@author mark老师 享学课堂 https://enjoy.ke.qq.com * *类说明:演示countdownlatch,有5个初始化的线程,6个扣除点, *扣除完毕以后,主线程和业务线程才能继续自己的工作 */ public class usecountdownlatch { static countdownlatch latch = new countdownlatch(6); //初始化线程(只有一步,有4个) private static class initthread implements runnable{ @override public void run() { system.out.println("thread_"+thread.currentthread().getid() +" ready init work......"); latch.countdown();//初始化线程完成工作了,countdown方法只扣减一次; for(int i =0;i<2;i++) { system.out.println("thread_"+thread.currentthread().getid() +" ........continue do its work"); } } } //业务线程 private static class busithread implements runnable{ @override public void run() { try { latch.await(); } catch (interruptedexception e) { e.printstacktrace(); } for(int i =0;i<3;i++) { system.out.println("busithread_"+thread.currentthread().getid() +" do business-----"); } } } public static void main(string[] args) throws interruptedexception { //单独的初始化线程,初始化分为2步,需要扣减两次 new thread(new runnable() { @override public void run() { sleeptools.ms(1); system.out.println("thread_"+thread.currentthread().getid() +" ready init work step 1st......"); latch.countdown();//每完成一步初始化工作,扣减一次 system.out.println("begin step 2nd......."); sleeptools.ms(1); system.out.println("thread_"+thread.currentthread().getid() +" ready init work step 2nd......"); latch.countdown();//每完成一步初始化工作,扣减一次 } }).start(); new thread(new busithread()).start(); for(int i=0;i<=3;i++){ thread thread = new thread(new initthread()); thread.start(); } latch.await(); system.out.println("main do ites work........"); } }
cyclicbarrier
让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程会继续运行cyclicbarrier(int parties)
cyclicbarrier(int parties, runnable barrieraction),屏障开放,barrieraction定义的任务会执行
countdownlatch和cyclicbarrier辨析
1、countdownlatch放行由第三者控制,cyclicbarrier放行由一组线程本身控制
2、countdownlatch放行条件》=线程数,cyclicbarrier放行条件=线程数
import java.util.map; import java.util.random; import java.util.concurrent.concurrenthashmap; import java.util.concurrent.cyclicbarrier; /** *@author mark老师 享学课堂 https://enjoy.ke.qq.com * *类说明:cyclicbarrier的使用 */ public class usecyclicbarrier { private static cyclicbarrier barrier = new cyclicbarrier(5,new collectthread()); private static concurrenthashmap<string,long> resultmap = new concurrenthashmap<>();//存放子线程工作结果的容器 public static void main(string[] args) { for(int i=0;i<=4;i++){ thread thread = new thread(new subthread()); thread.start(); } } //负责屏障开放以后的工作 private static class collectthread implements runnable{ @override public void run() { stringbuilder result = new stringbuilder(); for(map.entry<string,long> workresult:resultmap.entryset()){ result.append("["+workresult.getvalue()+"]"); } system.out.println(" the result = "+ result); system.out.println("do other business........"); } } //工作线程 private static class subthread implements runnable{ @override public void run() { long id = thread.currentthread().getid();//线程本身的处理结果 resultmap.put(thread.currentthread().getid()+"",id); random r = new random();//随机决定工作线程的是否睡眠 try { if(r.nextboolean()) { thread.sleep(2000+id); system.out.println("thread_"+id+" ....do something "); } system.out.println(id+"....is await"); barrier.await(); thread.sleep(1000+id); system.out.println("thread_"+id+" ....do its business "); } catch (exception e) { e.printstacktrace(); } } } }
semaphore
控制同时访问某个特定资源的线程数量,用在流量控制
exchange
两个线程间的数据交换
import java.util.arraylist; import java.util.hashset; import java.util.list; import java.util.set; import java.util.concurrent.exchanger; /** *@author mark老师 享学课堂 https://enjoy.ke.qq.com * *类说明:exchange的使用 */ public class useexchange { private static final exchanger<set<string>> exchange = new exchanger<set<string>>(); public static void main(string[] args) { //第一个线程 new thread(new runnable() { @override public void run() { set<string> seta = new hashset<string>();//存放数据的容器 try { /*添加数据 * set.add(.....) * */ seta = exchange.exchange(seta);//交换set /*处理交换后的数据*/ } catch (interruptedexception e) { } } }).start(); //第二个线程 new thread(new runnable() { @override public void run() { set<string> setb = new hashset<string>();//存放数据的容器 try { /*添加数据 * set.add(.....) * set.add(.....) * */ setb = exchange.exchange(setb);//交换set /*处理交换后的数据*/ } catch (interruptedexception e) { } } }).start(); } }
(ps:所有内容参考享学课堂视频)