欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

《java并发编程实战》笔记(一) 结构化并发应用程序

程序员文章站 2024-02-20 10:59:16
...

下载地址

链接:https://pan.baidu.com/s/1i6FlscH 密码:m21n

1.任务执行

任务是一组逻辑执行单元,线程是使得任务异步执行的机制

不可取的所谓线程开启的方式:
1.所有任务放在单个线程中串行执行
2.每一个任务都开启一个线程,无限制,非常浪费资源


有效方法:通过有界队列防止高负荷的应用程序把内存耗尽

使用线程池 java.util.concurrent Executor框架

1.1 线程池

好处:

  • web服务器不会在高负载的情况下失败
  • 服务器不会创建上千的线程来争夺有限的cpu和内存资源
  • Executor 可实现各种调优,管理,监控,日志

接口

public interface Executor{
    void executor(Runnable command);
}
线程池类型 作用
newFixedThreadPool 固定长度的线程池,自动维持固定数量的线程
newCachedThreadPool 可缓存的线程池,自动回收空闲的线程,如不足,会新增线程,线程池规模不存在限制
newSingleThreadExecutor 单线程的Executor,保持单线程串行执行
newScheduledThreadPool 固定数量的线程池,延时或者定时执行任务

创建线程池

import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class ExecutorTest {
    private static final int NUM = 100;
    private static final Executor exec = Executors.newFixedThreadPool(NUM);
    private static int index=0;
    private static int count=0;

    public static void main (String[] args) throws IOException {
        while(count++ < 100){
            Runnable task = new Runnable(){
                public void run(){
                    //TODO
                    System.out.println(Thread.currentThread().getName() +"  :"+index++);
                }
            };
            exec.execute(task);
        }
        System.exit(1);
    }
}

result:

pool-1-thread-1  :0
pool-1-thread-3  :2
pool-1-thread-2  :1
pool-1-thread-4  :3
...
pool-1-thread-98  :97
pool-1-thread-99  :98
pool-1-thread-100  :99

不存在超出100的线程。

Executor的生命周期

状态

  • 运行
  • 关闭
  • 已终止

接口

public interface ExecutorService extends Executor{
    void shutdown(); //平缓关闭,不接受新的任务,完成运行中的任务再关闭
    List<Runnable> shutdownNow;   //粗暴关闭,尝试关闭所有运行中的任务,不接受新的任务
    isTerminated(); //查看线程池是否关闭状态
    //....
}
class Demo{
    private static final Executor exec = ...
    
    public void start(){
        while(!exec.isShutdown){
        try{
            Runnable task = new Runable(){
                public void run(){
                    //TODO
                }
            };
            exec.execute(task);
        }
        }catch(RejectExecutorException e){
            if(!exec.isShutdown()){
                log.info("task rejected",e);
            }
        }
    }
    
    public void stop(){
        exec.shutdown();
    }
}

延时任务与周期任务

Timer类负责管理这类型任务,但表现非常差,缺点如下:

  • 执行只创建一个线程处理所有定时任务,无法保证每个任务的定时准确性。
  • 不捕获运行中的异常,直接终止定时线程,不会恢复线程执行。“线程泄漏”

正确使用:

线程池:ScheduledThreadPoolExector
方法: 构造方法 或者 newScheduledThreadPool()

调度服务:
1.DelyQueue(实现 BlockingQueue)  为 ScheduledThreadPoolExector提供调度服务
2.每个Deplyed对象都有一个相应的延迟时间
3.在DelyQueue中,只有某个对象逾期了,才能执行take操作。

Runnable的缺点

Runnable 不能返回一个值或者抛出受检查的异常

携带结果的Callable和Future

接口

public interface Callable<V>{
    V call throws Exception;  //V == void ,则表示无返回结果
}

public interface Future<V>{
    boolean cancel(boolean mayinterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException, CancellationException; //任务完成立刻返回,或者等待任务完成返回,或者抛出异常
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException;
}

使用方式

1.使用Runnable或Callable创建一个实例
2.使用 ExecutorService 的 submit 提交实例 得到 一个 Future
3.通过返回的Future调用相应的方法获得信息

使用页面渲染图像(串行下载)

public class FutureRender{
    public static final ExecutorService exec = Executors.newFixedThreadPool(100);
    
    /*
    *渲染页面
    */
    void renderPage(CharSequence source){
        final List<ImageInfo> imageInfos = scanForImageInfo(source);   //获得相关图像的信息
        
        //图片的下载构建任务 V =  List<ImageDate>
        Callable<List<ImageDate>> task = new Callable(){
            public List<ImageDate> call(){
                List<ImageDate> result = new ArrayList<ImageDate>();  //图像信息
                for(ImageInfo imageInfo : result){
                    result.add(imageInfo.downImage());
                }
                return result;
            }
        };
        
        rendTexe(source);  //渲染文字
        Future<List<ImageDate>> future = exec.submit(task);   //获得 Future对象
        
        try{
            List<ImageDate> imageData = future.get(); //future.get( 指定时间,timeUnit) 限时获得
            for(ImageData iamge : imageData){
                //渲染图像
            }
        }catch(InterruptedException e){
            //重新设置线程中断状态
            Thread.currentThread.interrupt();
            //取消任务
            future.cancel(true);
        }catch(ExecutiontException e){
            throw launcherThrowable(e.getCause());
        }
    }
}

优点:
文字和图片并行处理

缺点:
必须等待所有照片下载完

更高效的并行下载

CompletionService 融合了Executor 和 BlockingQueue
可执行Callable任务
最后通过队列的类似方式获得Future 结果

ExectorCompletionService 实现了CompletionService
1.提交的任务被包装成QueueFuture
2.计算结果放入BlockingQueue
3.方法阻塞,等待所有结果算出
public class Render{
    public static final ExecutorService exec = Executors.newFixedThreadPool(100);
    
    /*
    *渲染页面
    */
    void renderPage(CharSequence source){
        final List<ImageInfo> imageInfos = scanForImageInfo(source);   //获得相关图像的信息
        
    
        CompletionService<List<ImageDate>> completionService = new ExectorCompletionService(exec);
        
        for(ImageInfo iamgeInfo : imageInfos){
            completionService.submit(new Callable<ImageDate>(){
                public ImageDate call(){
                    return imageInfo.downImage());
                }
            });
           
        };
        
        rendTexe(source);  //渲染文字
        Future<List<ImageDate>> future = exec.submit(task);   //获得 Future对象
        
        try{
            for(int i=0; i<imageInfos.size(); i++){
              Future<ImageData> future =  completionService.take();
              ImageData imageData = future.get();
              //渲染图片
            }
        }catch(InterruptedException e){
            //重新设置线程中断状态
            Thread.currentThread.interrupt();
        }catch(ExecutiontException e){
            throw launcherThrowable(e.getCause());
        }
    }
}

invokeAll 批量获得future结果

1.支持限时返回任务结果
2.invokeAll 参数:一组任务,结果:一组Future,有序
3.某个任务未完成则取消,通过get() 或者isCancelled判断情况

案例:在预定时间内获得旅游网站报价

//报价任务
//TravelQuote 相关公司的报价信息
public class QuoteTask implements Callable<TravelQuote>{
    private final TravelCompany company;  //旅游公司
    private final TravelInfo travelInfo; //旅游信息条件
    
    public TravelQuote  call(){
        return company.quote(travelInfo);
    }
}

//获得多个公司旅游报价
public List<TravelQuote> getTravelQuotes(TravelInfo travelInfo, set<TravelCompany> companies, Comparator<TravelQuote> ranking, long outtime, TimeUnit unit) throws InterruptedExecption{
    List<QuoteTask> tasks = new ArrayList<QuoteTask>();  // 任务列表
    for(TravelCompany company : companies){
        tasks.add(new QuoteTask(company, travelInfo));
    }
    
    
    List<Future<TravelQuote>> futures = exec.invokeAll(tasks, outtime, unit);  //线程池执行报价任务返回future list
    List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
    
    Iterator<QuoteTask> taskIter = tasks.iterator();
    for(Future<TravelQuote> f : futures){
        QuoteTask task = taskIter.next();
        try{
            quotes.add(f.get());
        }catch(ExecutionException e){
            quotes.add(task.getFailureQuote(e.getCause()));
        }catch(CanellationExecption e){
            quotes.add(task.getTimeOutQuote(e));
        }
    }
    
    Collections.sort(quotes, ranking);
    return quotes;

}
相关标签: 并发