欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

再见线程

程序员文章站 2022-07-03 10:38:09
...

上次仅仅谈到线程的定义的几种方式,其实,在线程的使用过程中是需要十分谨慎的,在单机上,使用线程并不能很好的提高效率,当然这也是我之前一直以来的误区,一直认为使用线程就可以提高程序运行效率,实质上不然,线程的使用时需要十分谨慎的,如若一不小心就会适得其反,因此,再见线程时,就需要提到线程池管理线程的问题、线程安全的问题以及定时器的使用等等。

线程池

首先所谓线程池呢,简单而形象地来讲就是,存放很多线程的容器,也可以说是一个数据结构,那么线程池有什么作用呢?又是如何管理多个线程和正确打开多个线程的方式呢?

线程池的定义和使用

线程池有四种类型,分别是以下四种:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
但是一般情形下,使用最多的类型是第一种,那又该如何使用呢?具体如下:

//初始化线程
ExcutorService exec = Excutors.newCachedThreadPool();

//添加线程并返回该线程的Future
Future Future = exec.submit(Runnable);

//批量添加线程并返回一个Future集合
List<Future> futureList = exec.invokeAll(List);

在这里,使用线程池管理线程可以写成一个线程池管理工具类,方便对以后对多个线程进行管理,具体如下:

/**
 * 线程管理工具类
 * Created by Cecilia on 2017/8/5.
 */
public class ThreadUtil {

    private static ExecutorService exec;
    /**
     * 线程池管理工具
     * 传入一组带有返回值的线程,然后由工具类统一管理,并把最终的线程运行集合返回
     * @param list       线程集合
     * @param isBlock    是否阻塞标示位,如果为false,则线程启动后立即返回,否则等待所有线程均有结果后返回(推荐使用true)
     * @return           线程运行结果集合,仅返回已运行结束的线程集合,如都未结束,则返回尺寸为0的list
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static List<Object> runCheckCallable(List<Callable<Object>> list, boolean isBlock) throws InterruptedException, ExecutionException {
        //对参数进行检测
        if (CheckNull(list)) {
            return new ArrayList<>();
        }

        //初始化线程,打开一个线程池
        exec = Executors.newCachedThreadPool();
        //添加线程并返回Future线程
        List<Future<Object>> futureList = exec.invokeAll(list);

        if (!isBlock){
            return new ArrayList<>();
        }
        //检查并获取线程返回值
        return getAllCallableReturn(futureList);
    }

    /**
     * 对方法中传入的参数进行检测
     * @param list   传入的集合对象
     * @return       如果检测成功,返回true,反之,返回false
     */
    private static boolean CheckNull(List<Callable<Object>> list) {
        //检测list是否为空
        if (list==null||list.size()<1){
            return true;
        }
        //检测list中对象是否为空
        for (Callable callable :list){
            if (callable==null){
                return true;
            }
        }
        return false;
    }

    /**
     * 轮询获取所有Callable线程的返回值,直到所有返回值都被获取到(阻塞)
     * @param futureList     所有线程的管理类集合
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    private static List<Object> getAllCallableReturn(List<Future<Object>> futureList) throws ExecutionException, InterruptedException {
        List<Object> returnValue = new ArrayList<>(futureList.size());//为了使性能比较好,因此将ArrayList的长度为线程数量
        while(true){
            Iterator<Future<Object>> iterator = futureList.iterator();
            while(iterator.hasNext()){
                Future<Object> future = iterator.next();
                if (future.isDone()){
                    Object o = future.get();
                    returnValue.add(o);
                    iterator.remove();
                }
            }
            if (futureList.size()==0){
                break;
            }
            TimeUnit.MILLISECONDS.sleep(10*1000);
        }
        return returnValue;
    }

    /**
     * 线程池管理工具
     * 传入一组不带有返回值的线程,然后由工具类统一管理,对当前的线程进行监听
     * @param RunnableList       线程集合
     * @param isBlock            是否阻塞标示位,如果为false,则线程启动后立即返回,否则等待所有线程均有结果后返回(推荐使用true)
     * @return                   线程运行结果集合,仅返回已运行结束的线程集合,如都未结束,则返回尺寸为0的list
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void runCheckRunnable(List<Runnable> RunnableList,boolean isBlock) throws InterruptedException {
        //检测list是否为空
        if (RunnableList==null||RunnableList.size()<1){
            return;
        }
        //初始化线程池,使用一个线程池开启一个线程
        ExecutorService exec = Executors.newCachedThreadPool();
        //添加线程并返回Future线程
        List<Future> futureList = new ArrayList<>(RunnableList.size());
        //循环遍历,将每个线程依次添加进去
        for (Runnable runnable:RunnableList){
            if (runnable!=null){
                Future future = exec.submit(runnable);
                futureList.add(exec.submit(runnable));
            }
        }
        //线程不阻塞
        if (!isBlock){
            return;
        }
        while(true){
            Iterator<Future> iterator = futureList.iterator();
            while(iterator.hasNext()) {
                Future<Object> future = iterator.next();
                if (future.isDone()) {               //线程执行结束
                    iterator.remove();
                }
            }
            if (futureList.size()==0){
                break;
            }
            TimeUnit.MILLISECONDS.sleep(10);
        }
    }

    /**
     * 线程池管理工具
     * 传入一组不带有返回值的线程,然后由工具类统一管理,对当前的线程进行监听
     * @param RunnableList       线程集合
     * @return                   线程运行结果集合,仅返回已运行结束的线程集合,如都未结束,则返回尺寸为0的list
     */
    public static void runCheckRunnable(Runnable runnable){

        if (exec==null){
            exec = Executors.newCachedThreadPool();
        }
        exec.submit(runnable);
    }
}

线程池的作用

  • 在多处理器多线程环境下,可以降低资源的消耗,因为在频繁地创建和销毁线程过程中,不仅会降低速度,还会浪费线程资源,荣国线程池可以重复利用已创建的线程降低线程的创建和销毁造成的消耗;
  • 提高响应速度。当在任务抵达时,任务可以不需要等到线程创建时才开始执行;
  • 提高线程的可管理性。可以通过线程池对多个线程进行统一的分配和调度以及监控。

线程安全

在多处理器多线程下,线程的安全就显得非常重要了,万一使用不慎,便招致适得其反。那么有什么更好的解决方案吗?这当然是有的,想到我们强大的Java提供给我们一个线程安全的阻塞队列,用来提高我们的线程池管理多线程时的安全问题。那么这阻塞队列又是何物?顾名思义,大家应该能知道它的数据结构实质就是一个队列,那么他又该如何来发挥作用的呢?

BlockingQueue接口

这是一种最常用的阻塞队列的接口,它有两种子实现类,分别是ArrayBlockingQueue和LinkBlockingQueue,大家应该很快想到这似乎对应于集合中的ArrayList和LinkedList,是的,这两个子实现类也是分别一个基于数组的阻塞队列实现,一个是基于链表的队列实现,那么具体又该如何用呢?
Queue是一个指定尺寸的队列,对于ArrayBlockingQueue必须指定尺寸,但是LinkBlockingQueue却不一定需要,在这里,小编提供一个示例让大家清楚了解具体定义和使用方法,如下:

/**
 * 阻塞队列演示实例
 * Created by Cecilia on 2017/7/27.
 */
public class test2 {

    public static void main(String[] args) {
        //Queue的两种声明方式
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
        BlockingQueue<String> queue1 = new LinkedBlockingDeque<>(5);

        //添加元素
        System.out.println(queue.offer("芷若初荨"));  //立即返回          true
        System.out.println(queue.offer("芷若初荨1"));
        try {
            System.out.println(queue.offer("Cecilia", 5, TimeUnit.SECONDS));  //等待返回   true
            queue.put("cc");  //一直等待    芷若初荨
            queue.put("ccc");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //取值
        System.out.println(queue.poll());//直接获取并返回     Cecilia
        try {
            System.out.println(queue.poll(10, TimeUnit.SECONDS)); //指定等待时间   cc
            System.out.println(queue.take());      //一直等待
            System.out.println(queue.peek());       //取走但不删除
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
运行结果如下:

true
true
true
芷若初荨
芷若初荨1
Cecilia
cc

另外,小编,提供大家一个文件监控器来具体体现着具体的使用方式:
文件监控器的作用就是实现动态监控文件的读写操作以及文件的删除添加操作等等。

程序入口

/**
 * 程序入口
 * Created by Cecilia on 2017/7/29.
 */
public class FileMain {

    public static void main(String[] args) {
        Scanner sc = new Scanner(System.in);
        System.out.println("请输入源文件夹路径:");
        String srcPath = sc.next();
        System.out.println("请输入目的文件夹路径:");
        String toPath = sc.next();
        sc.close();
        FileControl fc = new FileControl();
        fc.FileControl(srcPath,toPath);
    }
}

文件主控类FileControl

/**
 * 文件监控主类
 * Created by Cecilia on 2017/7/29.
 */
public class FileControl {

    //缓存,每条数据都是一个文件的完整内容,key是文件名,value是文件内容
    private BlockingQueue<Map<String,List<String>>> queue;

    public FileControl(){

        queue = new LinkedBlockingQueue<>();//因为此处并不知道缓冲区的大小,因此使用LinkedBlockingQueue
    }

    /**
     * 文件监控主方法
     * @param path    源文件夹路径
     * @param toPath  目的文件夹路径
     */
    public void FileControl(String path,String toPath){
        ReadThread rh  = new ReadThread(queue,path);//读线程
        WriteThread wh = new WriteThread(queue,toPath);
        List<Runnable> list = new ArrayList<>();
        list.add(rh);
        list.add(wh);
        try {
            ThreadUtil.runCheckRunnable(list,false);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

读文件线程类

/**
 * 读线程,主要负责读取文件内容
 * Created by Cecilia on 2017/7/29.
 */
public class ReadThread implements Runnable{

    private BlockingQueue<Map<String,List<String>>> queue;//定义一个缓冲区

    private String path;

    /**
     * 构造函数,传入缓存Queue和源文件目录
     * @param queue   缓存queue
     * @param path     源文件目录
     */
    public ReadThread(BlockingQueue<Map<String,List<String>>> queue,String path){
        this.queue = queue;
        this.path = path;
    }
    @Override
    public void run() {

        //对path进行检测,如果路径非法,那么直接返回
        if (path==null||path.trim().equals("")){
            return;
        }
        //文件夹不存在或是文件,那么直接返回
        File file = new File(path);
        if (!file.exists()||file.isFile()){
            return;
        }

        //获取待转移文件集合
        List<File> fileList = checkFileList(file);
        if (fileList!=null){
            for (File tempFile:fileList){
                List<String> contextList = null;
                try {
                    contextList = FileUtilImplements.readFile(tempFile);//读取一个文件的全部内容
                } catch (IOException e) {
                    e.printStackTrace();
                }
                Map<String,List<String>> map = new HashMap<>();
                if (contextList==null||contextList.size()<1){
                    map.put(tempFile.getName(),new ArrayList<>());
                }else{
                    //清洗内容
                    List<String> newContextList = new ArrayList<>(contextList.size());
                    for (String context:contextList){
                        newContextList.add(context.replaceAll("芷若初荨","芷若初荨520765234"));
                    }
                    map.put(tempFile.getName(),newContextList);
                }
                //把要转移的内容缓存入Queue
                try {
                    queue.put(map);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 检查文件夹中是否有可移动的文件
     * @param file   文件对象
     * @return       如果有,则返回可移动文件集合,如果没有,则返回null
     */
    private List<File> checkFileList(File file) {

        //获取文件列表
        File[] files = file.listFiles();
        if (files.length==0){
            return null;
        }
        //文件列表存在,且不为空,那么移除集合中的文件夹。仅保留文件
        List<File> fileList = Arrays.asList(file.listFiles());
        Iterator<File> iterator = fileList.iterator();
        while(iterator.hasNext()){
            File tempFile = iterator.next();
            if (tempFile.isDirectory()){
                iterator.remove();
            }
        }
        //移除后集合尺寸为0,那么返回为空
        if (fileList.size()<1){
            return null;
        }
        return fileList;
    }
}

写文件线程类

/**
 * 写线程,主要负责写文件
 * Created by Cecilia on 2017/7/29.
 */
public class WriteThread implements Runnable {

    private BlockingQueue<Map<String,List<String>>> queue;//定义缓冲区

    private String toPath;//定义目的路径

    /**
     * 构造函数,传入缓存Queue和目标文件目录
     * @param queue  缓存Queue
     * @param toPath  目标文件目录
     */
    public WriteThread(BlockingQueue<Map<String,List<String>>> queue,String toPath){
        this.queue = queue;
        this.toPath = toPath;
    }

    @Override
    public void run() {
        File toFile = new File(toPath);
        //检测目标文件对象是否是文件夹或是否存在,如果不存在,则创建,创建失败则返回
        if (!toFile.exists()||toFile.isFile()){
            //文件不存在或是文件,则递归创建文件夹
            boolean flag = toFile.mkdirs();
            if (!flag){
                //如果创建失败,则返回
                return;
            }
        }
        //循环读取缓存中的文件内容,读取后写入新文件
        while(true){
            try {
                Map<String,List<String>> contextMap = queue.take();
                Iterator<String> iterator = contextMap.keySet().iterator();
                String key = iterator.next();  //获取map中的值,只有一个key
                List<String> contextList = contextMap.get(key);
                if (contextList!=null||contextList.size()>0){
                    //清洗内容
                    FileUtilImplements.writeFile(toPath,key,contextList,false);
                }else{  //没有内容,则直接创建文件
                    File file = new File(FileUtilImplements.getFullPath(toPath,key));
                    file.createNewFile();
                }
            } catch (InterruptedException|IOException e) {
                e.printStackTrace();
            }
        }
    }
}

定时器

定时器,顾名思义,就是类似于始终定时的功能一样,但是为什么会需要用到这个呢?还记得我么在最开始使用继承Thread类或实现Runnable接口来定义接口时,使用Thread.sleep(毫秒值)来让跑得非常快的线程暂时休眠一会,但是这种方式是最不专业和安全的,因此对此,定时器会是一个比较好的选择。那么这个又该如何定义呢?其实,这个比较简单,会定义线程的人就会定义定时器了,定时器实质类似于一个线程,步骤如下:
* 继承TimeTask类
* 覆写run方法
* 借助Time的schdule方法来使用定时器,使之跑起来,比如:

Timer timer = new Timer();
timer.schedule(new VoteTimertask(voteMain),0,1000*60);

易错点

在这里,一直有个问题困惑我————线程会阻塞吗?相信大家会很奇怪,线程的五种状态中不就存在阻塞状态嘛,线程肯定会阻塞啊,但是大家有想过没,线程的状态是针对哪种情形讨论的,实际上,现成的状态是针对在多线程的环境下生成的,对于单线程环境下,线程是不阻塞的,如果阻塞的话,那么我们平时使用的Main函数、以及和其他函数一起使用的时候并没有出现阻塞状态,因此在这里说明的就是线程本身不存在阻塞,在单线程环境下,也不会存在阻塞,只有在多环境下,存在被阻塞的情形导致线程被阻塞,比如使用Thread.sleep()方法也是对线程阻塞的一种方式。*