欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

线程的并发工具类

程序员文章站 2022-12-24 12:13:03
Fork-Join(分而治之) 规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解 如何使用的流程图 用法 1.Fork/Join的同步用法同时演示返回结果值:统计整形数组中所有元素的和 两个main方法,Sum ......

fork-join(分而治之) 

规模为n的问题,n<阈值,直接解决,n>阈值,将n分解为k个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解

 如何使用的流程图

线程的并发工具类

用法

1.fork/join的同步用法同时演示返回结果值:统计整形数组中所有元素的和

两个main方法,sumsinglethread类里的main是单线程求和,每次休眠一秒;sumbyforkjoin类里使用forkandjoin进行求和

下面是个生成随机整数数组的类

import java.util.random;

/**
 * 产生一个整形的数组
 * */
public class createarray {
    public static final int array_lenth=1000;
    public static int[] createarray(){
        random r = new random();
        int[] result = new int[array_lenth];
        for(int i=0;i<array_lenth;i++){
            result[i]=r.nextint(array_lenth*3);
        }
        return result;
    }
}
import com.thread.demo.sleeptools;

public class sumsinglethread {
    public static void main(string[] args) {
         int count = 0;
            int[] src =createarray.createarray();

            long start = system.currenttimemillis();
            for(int i= 0;i<src.length;i++){
                sleeptools.ms(1);
                count = count + src[i];
            }
            system.out.println("the count is "+count
                    +" spend time:"+(system.currenttimemillis()-start)+"ms");    
    }

}
public class sumbyforkjoin {
    private static class sumtask extends recursivetask<integer>{
        private static final int threshold = createarray.array_lenth/10;
        private int[] src; //表示我们要实际统计的数组
        private int fromindex;//开始统计的下标
        private int toindex;//统计到哪里结束的下标
        public sumtask(int[] src, int fromindex, int toindex) {
            this.src = src;
            this.fromindex = fromindex;
            this.toindex = toindex;
        }
        /**
         * 这个是有返回值的,在compute方法中按照需要的逻辑写forkjoin逻辑
         * */
        @override
        protected integer compute() {
            //当满足阈值范围时,进入计算
            if(toindex-fromindex<threshold){
                int count = 0;
                for(int i=fromindex;i<toindex;i++){
                    count=count+src[i];
                }
                return count;
            }else{//不满足阈值时,继续拆分
                int mid = (fromindex+toindex)/2;
                sumtask left = new sumtask(src, fromindex, mid);
                sumtask right = new sumtask(src, mid+1, toindex);
                invokeall(left, right);
                return left.join()+right.join();
                }
            }
        }
    
    public static void main(string[] args) {
        
          forkjoinpool pool = new forkjoinpool();
          int[] src = createarray.createarray();

          sumtask innerfind = new sumtask(src,0,src.length-1);

          long start = system.currenttimemillis();

          pool.invoke(innerfind);//同步调用,就是这个方法执行完才会继续执行下面的sysout,所以以这个demo是同步的用法,异步调用的方法:execute(object)
          system.out.println("task is running.....");

          system.out.println("the count is "+innerfind.join()
                  +" spend time:"+(system.currenttimemillis()-start)+"ms");
    }
   

}

q:把循环求和中的sleep注掉,并且增大数组的长度,会发现,在小于一定长度时,单线程直接求和的速度比使用fork/jion快

a:因为使用forkjoin时cpu会进行上下问切换操作,这个操作相比较于计算型操作其实更费时间

 

2.fork/join的异步用法同时演示不要求返回值:遍历指定目录(含子目录)寻找指定类型文件

import java.io.file;
import java.util.arraylist;
import java.util.list;
import java.util.concurrent.forkjoinpool;
import java.util.concurrent.recursiveaction;

public class findfile extends recursiveaction{
    private file path;
    public  findfile(file path) {
        this.path=path;
        
    }
    
    @override
    protected void compute() {
        list<findfile> subtasks = new arraylist<findfile>();
        file[] files = path.listfiles();
        if(files!=null){
            
        
        for(file file:files){//循环文件路径
            if(file.isdirectory()){//判断是不是目录
                subtasks.add(new findfile(file));
            }else{
                if(file.getabsolutepath().endswith("avi")){
                    system.out.println("找到对应文件:"+file.getabsolutepath());
                }
            }
        }
        if(!subtasks.isempty()){
            for(findfile sub:invokeall(subtasks)){//invokeall的返回值和传入的值一样
                sub.join();
            }
        }
        }
    }
    public static void main(string[] args) {
        try {
            forkjoinpool pool = new forkjoinpool();
            findfile task = new findfile(new file("d:/"));

            pool.execute(task);//异步调用

            system.out.println("task is running......");
            thread.sleep(1);
            int otherwork = 0;
            for(int i=0;i<100;i++){
                otherwork = otherwork+i;
            }
            system.err.println("main thread done sth......,otherwork="+otherwork);
            task.join();//阻塞的方法,此处是为了防止出现主线程走完,task被直接中断的情况
            system.out.println("task end");
        } catch (interruptedexception e) {
            // todo auto-generated catch block
            e.printstacktrace();
        }
    }

}

 

常用的并发工具类(直接放课程里的demo了,很详细了~)

countdownlatch

作用是一线程等待其他的线程完成工作以后在执行加强版join

await用来等待,countdown负责计数器的减一

 

import java.util.concurrent.countdownlatch;

import com.xiangxue.tools.sleeptools;

/**
 *@author mark老师   享学课堂 https://enjoy.ke.qq.com 
 *
 *类说明:演示countdownlatch,有5个初始化的线程,6个扣除点,
 *扣除完毕以后,主线程和业务线程才能继续自己的工作
 */
public class usecountdownlatch {
    
    static countdownlatch latch = new countdownlatch(6);

    //初始化线程(只有一步,有4个)
    private static class initthread implements runnable{

        @override
        public void run() {
            system.out.println("thread_"+thread.currentthread().getid()
                    +" ready init work......");
            latch.countdown();//初始化线程完成工作了,countdown方法只扣减一次;
            for(int i =0;i<2;i++) {
                system.out.println("thread_"+thread.currentthread().getid()
                        +" ........continue do its work");
            }
        }
    }
    
    //业务线程
    private static class busithread implements runnable{

        @override
        public void run() {
            try {
                latch.await();
            } catch (interruptedexception e) {
                e.printstacktrace();
            }
            for(int i =0;i<3;i++) {
                system.out.println("busithread_"+thread.currentthread().getid()
                        +" do business-----");
            }
        }
    }

    public static void main(string[] args) throws interruptedexception {
        //单独的初始化线程,初始化分为2步,需要扣减两次
        new thread(new runnable() {
            @override
            public void run() {
                sleeptools.ms(1);
                system.out.println("thread_"+thread.currentthread().getid()
                        +" ready init work step 1st......");
                latch.countdown();//每完成一步初始化工作,扣减一次
                system.out.println("begin step 2nd.......");
                sleeptools.ms(1);
                system.out.println("thread_"+thread.currentthread().getid()
                        +" ready init work step 2nd......");
                latch.countdown();//每完成一步初始化工作,扣减一次
            }
        }).start();
        new thread(new busithread()).start();
        for(int i=0;i<=3;i++){
            thread thread = new thread(new initthread());
            thread.start();
        }

        latch.await();
        system.out.println("main do ites work........");
    }
}

 

 

 

cyclicbarrier

让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程会继续运行cyclicbarrier(int parties)

cyclicbarrier(int parties, runnable barrieraction),屏障开放,barrieraction定义的任务会执行

countdownlatch和cyclicbarrier辨析

1countdownlatch放行由第三者控制,cyclicbarrier放行由一组线程本身控制
2countdownlatch放行条件》=线程数,cyclicbarrier放行条件=线程数

 

import java.util.map;
import java.util.random;
import java.util.concurrent.concurrenthashmap;
import java.util.concurrent.cyclicbarrier;

/**
 *@author mark老师   享学课堂 https://enjoy.ke.qq.com 
 *
 *类说明:cyclicbarrier的使用
 */
public class usecyclicbarrier {
    
    private static cyclicbarrier barrier 
        = new cyclicbarrier(5,new collectthread());
    
    private static concurrenthashmap<string,long> resultmap
            = new concurrenthashmap<>();//存放子线程工作结果的容器

    public static void main(string[] args) {
        for(int i=0;i<=4;i++){
            thread thread = new thread(new subthread());
            thread.start();
        }

    }

    //负责屏障开放以后的工作
    private static class collectthread implements runnable{

        @override
        public void run() {
            stringbuilder result = new stringbuilder();
            for(map.entry<string,long> workresult:resultmap.entryset()){
                result.append("["+workresult.getvalue()+"]");
            }
            system.out.println(" the result = "+ result);
            system.out.println("do other business........");
        }
    }

    //工作线程
    private static class subthread implements runnable{

        @override
        public void run() {
            long id = thread.currentthread().getid();//线程本身的处理结果
            resultmap.put(thread.currentthread().getid()+"",id);
            random r = new random();//随机决定工作线程的是否睡眠
            try {
                if(r.nextboolean()) {
                    thread.sleep(2000+id);
                    system.out.println("thread_"+id+" ....do something ");
                }
                system.out.println(id+"....is await");
                barrier.await();
                thread.sleep(1000+id);
                system.out.println("thread_"+id+" ....do its business ");
            } catch (exception e) {
                e.printstacktrace();
            }

        }
    }
}

 

semaphore

控制同时访问某个特定资源的线程数量,用在流量控制

 

exchange

两个线程间的数据交换

 

import java.util.arraylist;
import java.util.hashset;
import java.util.list;
import java.util.set;
import java.util.concurrent.exchanger;

/**
 *@author mark老师   享学课堂 https://enjoy.ke.qq.com 
 *
 *类说明:exchange的使用
 */
public class useexchange {
    private static final exchanger<set<string>> exchange 
        = new exchanger<set<string>>();

    public static void main(string[] args) {

        //第一个线程
        new thread(new runnable() {
            @override
            public void run() {
                set<string> seta = new hashset<string>();//存放数据的容器
                try {
                    /*添加数据
                     * set.add(.....)
                     * */
                    seta = exchange.exchange(seta);//交换set
                    /*处理交换后的数据*/
                } catch (interruptedexception e) {
                }
            }
        }).start();

      //第二个线程
        new thread(new runnable() {
            @override
            public void run() {
                set<string> setb = new hashset<string>();//存放数据的容器
                try {
                    /*添加数据
                     * set.add(.....)
                     * set.add(.....)
                     * */
                    setb = exchange.exchange(setb);//交换set
                    /*处理交换后的数据*/
                } catch (interruptedexception e) {
                }
            }
        }).start();

    }
}

 

 

 

 

 (ps:所有内容参考享学课堂视频)