欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

JUC笔记(3)

程序员文章站 2022-03-27 09:23:27
11. 线程池(重点)线程池:三大方法、7大参数、4种拒绝策略池化技术程序的运行,本质:占用系统的资源! 优化资源的使用!=>池化技术线程池、连接池、内存池、对象池///..... 创建、销毁。十分浪费资源池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我线程池的好处:降低资源的消耗提高响应速度方便管理线程复用、可以控制最大并发数、管理线程(1) 线程池:三大方法//创建一个固定大小的线程池public static ExecutorS....

11. 线程池(重点)

线程池:三大方法、7大参数、4种拒绝策略

池化技术

程序的运行,本质:占用系统的资源! 优化资源的使用!=>池化技术
线程池、连接池、内存池、对象池///.....  创建、销毁。十分浪费资源
池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我

线程池的好处:

  • 降低资源的消耗
  • 提高响应速度
  • 方便管理

线程复用、可以控制最大并发数、管理线程

(1) 线程池:三大方法

//创建一个固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
//可伸缩的,遇强则强,遇弱则弱
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
//单个线程
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

JUC笔记(3)

三大方法的本质是ThreadPoolExecutor

(2) 线程池:7大参数

public ThreadPoolExecutor(int corePoolSize,//核心线程池大小
                              int maximumPoolSize,//最大线程池大小 几核写几
                              long keepAliveTime,//超时没人调用就会释放
                              TimeUnit unit,//超时单位
                              BlockingQueue<Runnable> workQueue,//阻塞队列
                              ThreadFactory threadFactory,//线程工厂,创建线程的,一般不动
                              RejectedExecutionHandler handler//拒绝策略) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

JUC笔记(3)

// Executors 工具类、3大方法

/**
四种拒绝策略:
 * new ThreadPoolExecutor.AbortPolicy() // (默认)银行满了,还有人进来,不处理这个人的,抛出异常
 * new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的去哪里!main线程执行
 * new ThreadPoolExecutor.DiscardPolicy() //队列满了,丢掉任务,不会抛出异常!
 * new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了,尝试去和最早的竞争,也不会抛出异常!
 */
public class Demo01 {
    public static void main(String[] args) {
        // 自定义线程池!工作 ThreadPoolExecutor

        // 最大线程到底该如何定义
        // 1、CPU 密集型,几核,就是几,可以保持CPu的效率最高!
        // 2、IO  密集型   > 判断你程序中十分耗IO的线程,
        // 程序   15个大型任务  io十分占用资源!

        // 获取CPU的核数
        System.out.println(Runtime.getRuntime().availableProcessors());

        List  list = new ArrayList();
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,//核心大小,2个柜台
                Runtime.getRuntime().availableProcessors(),//最多5个柜台
                3,//三秒钟没人使用新开辟的,自动关闭
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),//候客区
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy());  //队列满了,尝试去和最早的竞争,也不会抛出异常!
        try {
            // 最大承载:Deque + max
            // 超过 RejectedExecutionException
            for (int i = 1; i <= 9; i++) {
                // 使用了线程池之后,使用线程池来创建线程
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" ok");
                });
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 线程池用完,程序结束,关闭线程池
            threadPool.shutdown();
        }
    }
}

(3) 4种拒绝策略

  • new ThreadPoolExecutor.AbortPolicy() // 银行满了,还有人进来,不处理这个人的,抛出异 常
  • new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的去哪里!
  • new ThreadPoolExecutor.DiscardPolicy() //队列满了,丢掉任务,不会抛出异常!
  • new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了,尝试去和早的竞争,也不会 抛出异常! 

小结和拓展

池的最大的大小如何去设置!
了解:IO密集型,CPU密集型:(调优)

最大线程到底该如何定义       

  • CPU 密集型,几核,就是几,可以保持CPu的效率高!int maximumPoolSize = Runtime.getRuntime().availableProcessors()
  • IO  密集型   > 判断你程序中十分耗IO的线程,2倍也可        

12. 四大函数式接口(必须掌握)

新时代的程序员:lambda表达式、链式编程、函数式接口、Stream流式计算

函数式接口:只有一个方法的接口

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

有很多Function Interface, 简化编程模型。

例如foreach(消费者类的函数式接口)

JUC笔记(3)JUC笔记(3)

/**
 * Function 函数型接口, 有一个输入参数,有一个输出
 * 只要是 函数型接口 可以 用 lambda表达式简化
 */
public class Demo01 {
    public static void main(String[] args) {
        //
//        Function<String,String> function = new Function<String,String>() {
//            @Override
//            public String apply(String str) {
//                return str;
//            }
//        };

        Function<String,String> function = str->{return str;};

        System.out.println(function.apply("asd"));
    }
}

断定型接口:

有一个输入参数,返回值只能是布尔值!

JUC笔记(3)

/**
 * 断定型接口:有一个输入参数,返回值只能是 布尔值!
 */
public class Demo02 {
    public static void main(String[] args) {
        // 判断字符串是否为空
//        Predicate<String> predicate = new Predicate<String>(){
            @Override
            public boolean test(String str) {
                return str.isEmpty();
            }
        };

        Predicate<String> predicate = (str)->{return str.isEmpty(); };
        System.out.println(predicate.test(""));

    }
}

Consumer消费型接口

JUC笔记(3)

/**
 * Consumer 消费型接口: 只有输入,没有返回值
 */
public class Demo03 {
    public static void main(String[] args) {
//        Consumer<String> consumer = new Consumer<String>() {
//            @Override
//            public void accept(String str) {
//                System.out.println(str);
//            }
//        };
        Consumer<String> consumer = (str)->{System.out.println(str);};
        consumer.accept("sdadasd");

    }
}

Supplier 供给型接口

JUC笔记(3)

/**
 * Supplier 供给型接口 没有参数,只有返回值
 */
public class Demo04 {
    public static void main(String[] args) {
//        Supplier supplier = new Supplier<Integer>() {
//            @Override
//            public Integer get() {
//                System.out.println("get()");
//                return 1024;
//            }
//        };

        Supplier supplier = ()->{ return 1024; };
        System.out.println(supplier.get());
    }
}

13. Stream流式计算

什么是Stream流式计算:

大数据:存储 + 计算

集合、MySQL 本质就是存储东西的;计算都应该交给流来操作!

JUC笔记(3)

/**
 * 题目要求:一分钟内完成此题,只能用一行代码实现!
 * 现在有5个用户!筛选:
 * 1、ID 必须是偶数
 * 2、年龄必须大于23岁
 * 3、用户名转为大写字母
 * 4、用户名字母倒着排序
 * 5、只输出一个用户!
 */
public class Test {
    public static void main(String[] args) {
        User u1 = new User(1,"a",21);
        User u2 = new User(2,"b",22);
        User u3 = new User(3,"c",23);
        User u4 = new User(4,"d",24);
        User u5 = new User(6,"e",25);
        // 集合就是存储
        List<User> list = Arrays.asList(u1, u2, u3, u4, u5);

        // 计算交给Stream流
        // lambda表达式、链式编程、函数式接口、Stream流式计算
        list.stream()//list转换成流,stream的泛型就是user
                .filter(u->{return u.getId()%2==0;})//id全是偶数;predicate,断定型接口
                .filter(u->{return u.getAge()>23;})
                .map(u->{return u.getName().toUpperCase();})//u是形式参数,代表用户名字
                .sorted((uu1,uu2)->{return uu2.compareTo(uu1);})//uu1表示用户名,因为map哪里已经转换成name了
                .limit(1)
                .forEach(System.out::println);
    }
}

JUC笔记(3)JUC笔记(3)

14. ForkJoin(分支合并)

什么是ForkJoin

ForkJoin 在 JDK 1.7 , 并行执行任务!提高效率。大数据量! 大数据:Map Reduce (把大任务拆分为小任务)

JUC笔记(3)

ForkJoin特点:工作窃取

这个里面维护的都是双端队列(两端都能取)

JUC笔记(3)

forkjoin的api:

JUC笔记(3)JUC笔记(3)

在ForkJoinPool中运行的任务的抽象基类。ForkJoinTask是一个类似线程的实体,它比普通线程轻得多。大量的任务和子任务可能由ForkJoinPool中的少量实际线程托管,代价是有些使用限制。

/**
 * 求和计算的任务!
 * 3000   6000(ForkJoin)  9000(Stream并行流)
 * // 如何使用 forkjoin
 * // 1、forkjoinPool 通过它来执行
 * // 2、计算任务 forkjoinPool.execute(ForkJoinTask task)
 * // 3. 计算类要继承 ForkJoinTask
 */
public class ForkJoinDemo extends RecursiveTask<Long> {

    private Long start;  // 1
    private Long end;    // 1990900000   

    // 临界值
    private Long temp = 10000L;//大于10000用forkjoin计算

    public ForkJoinDemo(Long start, Long end) {
        this.start = start;
        this.end = end;
    }
    // 计算方法
    @Override
    protected Long compute() {
        if ((end-start)<temp){
            Long sum = 0L;
            for (Long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }else { // forkjoin 递归
            long middle = (start + end) / 2; // 中间值
            ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
            task1.fork(); // 拆分任务,把任务压入线程队列
            ForkJoinDemo task2 = new ForkJoinDemo(middle+1, end);
            task2.fork(); // 拆分任务,把任务压入线程队列

            return task1.join() + task2.join();
        }
    }
}

测试

/**
 * 同一个任务,别人效率高你几十倍!
 */
public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // test1(); // 12224
        // test2(); // 10038
        // test3(); // 153
    }

    // 普通程序员
    public static void test1(){
        Long sum = 0L;
        long start = System.currentTimeMillis();
        for (Long i = 1L; i <= 10_0000_0000; i++) {
            sum += i;
        }
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+" 时间:"+(end-start));
    }

    // 会使用ForkJoin 中级程序员
    public static void test2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L);
        ForkJoinTask<Long> submit = forkJoinPool.submit(task);// 提交任务
        Long sum = submit.get();

        long end = System.currentTimeMillis();

        System.out.println("sum="+sum+" 时间:"+(end-start));
    }
    //牛逼程序员
    public static void test3(){
        long start = System.currentTimeMillis();
        // Stream并行流 ()range  (]rangeClosed
        long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
        long end = System.currentTimeMillis();
        System.out.println("sum="+"时间:"+(end-start));
    }
}

15. 异步回调

Future 设计的初衷: 对将来的某个事件的结果进行建模

JUC笔记(3)

/**
 * 异步调用: CompletableFuture
 * // 异步执行
 * // 成功回调
 * // 失败回调
 */
public class Demo01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 没有返回值的 runAsync 异步回调
//        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
//            try {
//                TimeUnit.SECONDS.sleep(2);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//            System.out.println(Thread.currentThread().getName()+"runAsync=>Void");
//        });
//
//        System.out.println("1111");
//
//        completableFuture.get(); // 获取阻塞执行结果

        // 有返回值的 supplyAsync 异步回调
        // ajax,成功和失败的回调
        // 返回的是错误信息;
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer");
            int i = 10/0;
            return 1024;
        });

        System.out.println(completableFuture.whenComplete((t, u) -> {
            System.out.println("t=>" + t); // 正常的返回结果
            System.out.println("u=>" + u); // 错误信息:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return 233; // 可以获取到错误的返回结果
        }).get());

        /**
         * succee Code 200
         * error Code 404 500
         */
    }
}

所谓异步请求,就是main在执行的时候,派出别的线程异步执行,正常多线程没有返回值,但使用异步回调,可以自定义设置返回值,然后通过get把值取回来。

https://blog.csdn.net/javazejian/article/details/50896505?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-7.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-7.control

Future<V>接口

Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。

Future<V>接口是用来获取异步计算结果的,说白了就是对具体的Runnable或者Callable对象任务执行的结果进行获取(get()),取消(cancel()),判断是否完成等操作。我们看看Future接口的源码:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

方法解析:

V get() :获取异步执行的结果,如果没有结果可用,此方法会阻塞直到异步计算完成。

V get(Long timeout , TimeUnit unit) :获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,如果阻塞时间超过设定的timeout时间,该方法将抛出异常。

boolean isDone() :如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回true。

boolean isCanceller() :如果任务完成前被取消,则返回true。

boolean cancel(boolean mayInterruptRunning) :如果任务还没开始,执行cancel(...)方法将返回false;如果任务已经启动,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,如果停止成功,返回true;当任务已经启动,执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回false;当任务已经完成,执行cancel(...)方法将返回false。mayInterruptRunning参数表示是否中断执行中的线程。

通过方法分析我们也知道实际上Future提供了3种功能:(1)能够中断执行中的任务(2)判断任务是否执行完成(3)获取任务执行完成后额结果。

但是我们必须明白Future只是一个接口,我们无法直接创建对象,因此就需要其实现类FutureTask登场啦。

FutureTask类

我们先来看看FutureTask的实现

public class FutureTask<V> implements RunnableFuture<V> {

FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
分析:FutureTask除了实现了Future接口外还实现了Runnable接口,因此FutureTask也可以直接提交给Executor执行。 当然也可以调用线程直接执行(FutureTask.run())。

 

 

 

 

 

 

 

 

 

 

本文地址:https://blog.csdn.net/qq_43378019/article/details/112003748

相关标签: java juc