欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

十一、Android性能优化之多线程优化

程序员文章站 2022-07-02 23:23:45
...

####一、多线程产生的问题与简单优化

public class ThreadTest1 {

    public static void main(String[] args) {
        new Producer().start();
        new Consumer().start();
    }

    static class ProductObject {
        public static String value = null;
    }

    static class Consumer extends Thread {
        @Override
        public void run() {
            while (true) {
                if (ProductObject.value != null) {
                    System.out.println("消费产品" + ProductObject.value);
                    ProductObject.value = null;
                }
            }
        }
    }

    static class Producer extends Thread {
        @Override
        public void run() {
            //不断生产产品
            while (true) {
                if (ProductObject.value == null) {
                    //产品已经消费完成,生产新的产品
                    ProductObject.value = "No:" + System.currentTimeMillis();
                    System.out.println("生产产品" + ProductObject.value);
                }
            }
        }
    }
}

结果输出:
生产产品No:1505980855609
消费产品No:1505980855609
生产产品No:1505980855609
消费产品No:1505980855609
生产产品No:1505980855609
消费产品No:1505980855609
生产产品No:1505980855609
消费产品No:1505980855609
消费产品No:1505980855609
生产产品No:1505980855609
生产产品No:1505980855609
消费产品No:1505980855609
生产产品No:1505980855609
消费产品No:1505980855609
生产产品No:1505980855609
复制代码

######我们发现该示例并没有一直执行,而是执行一段时间后停止打印

#####1.原因

内存机制中的 "副本"概念 多个线程访问一个成员变量时 每个线程都会得到一个该变量的副本 在自己的线程的栈中保存、计算 以提高速度。 但是这样就会有同步的问题了。 当一个线程修改了自己栈内副本的值 还没有立即将同步到主存中, 其他线程再来获取主存中的该变量时 就会得到过期数据。

#####1.解决办法 为了解决这种问题 可以使用synchronized对该变量的操作同步 , 或使用volatile关键字声明该变量为易变对象 这样的话 每个线程就不会创建副本到自己的栈中 而是直接操作主存。

######(1)volatile 在对象/变量前加上 volatile 。 Volatile修饰的 成员变量 在每次被 线程 访问时,都强迫从 共享内存 中重读该成员变量的值。而且,当 成员变量 发生变化时,强迫线程将变化值回写到 共享内存 。这样在任何时刻,两个不同的线程总是看到某个 成员变量 的同一个值。 Java语言 规范中指出:为了获得最佳速度,允许线程保存共享 成员变量 的私有拷贝,而且只当线程进入或者离开 同步代码块 时才与共享成员变量的原始值对比。这样当多个线程同时与某个对象交互时,就必须要注意到要让线程及时的得到共享 成员变量 的变化。而volatile 关键字 就是提示JVM:对于这个 成员变量 不能保存它的私有拷贝,而应直接与共享成员变量交互。使用建议:在两个或者更多的线程访问的 成员变量 上使用volatile。当要访问的 变量 已在synchronized代码块中,或者为 常量 时,不必使用。由于使用volatile屏蔽掉了JVM中必要的 代码优化 ,所以在效率上比较低,因此一定在必要时才使用此 关键字 。

    static class ProductObject {
        public volatile static String value = null;
    }
}

结果输出:
消费产品No:1505982581204
生产产品No:1505982581204
消费产品No:1505982581204
生产产品No:1505982581204
消费产品No:1505982581204
生产产品No:1505982581204
消费产品No:1505982581204
生产产品No:1505982581204
消费产品No:1505982581204
生产产品No:1505982581204
(省略...)
复制代码

程序一直输出符合要求

######(2)synchronized ######由于是上例中 volatile while 一直执行性能开销比较大 ,则需要加上锁 synchronized避免大量性能开销

将对象/变量加上锁 synchronized 修饰。在线程中,使用同步方法或者同步块。

public class ThreadTest1 {

    public static void main(String[] args) {
        Object lock = new Object();
        new Producer(lock).start();
        new Consumer(lock).start();

    }

    static class ProductObject {
        public static String value = null;
    }

    static class Consumer extends Thread {
        Object lock;

        public Consumer(Object lock) {
            this.lock = lock;
        }

        @Override
        public void run() {

            while (true) {
                synchronized (lock) {//互斥锁

                    if (ProductObject.value != null) {
                        System.out.println("消费产品" + ProductObject.value);
                        ProductObject.value = null;
                    }
                }
            }
        }
    }

    static class Producer extends Thread {
        Object lock;

        public Producer(Object lock) {
            this.lock = lock;
        }

        @Override
        public void run() {

            //不断生产产品
            while (true) {
                synchronized (lock) {//互斥锁
                    if (ProductObject.value == null) {
                        //产品已经消费完成,生产新的产品
                        ProductObject.value = "No:" + System.currentTimeMillis();
                        System.out.println("生产产品" + ProductObject.value);
                    }
                }
            }
        }
    }
}
复制代码

程序一直输出符合要求 ######但是,为了明确对象锁的程序先后执行顺序(减少轮询次数),所有要引入wait() notify()方法

Obj.wait(),与Obj.notify()必须要与synchronized(Obj)一起使用,也就是wait,与notify是针对已经获取了Obj锁进行操作,从语法角度来说就是Obj.wait(),Obj.notify必须在synchronized(Obj){...}语句块内。从功能上来说wait就是说线程在获取对象锁后,主动释放对象锁,同时本线程休眠。直到有其它线程调用对象的notify()唤醒该线程,才能继续获取对象锁,并继续执行。相应的notify()就是对对象锁的唤醒操作。但有一点需要注意的是notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束,自动释放锁后,JVM会在wait()对象锁的线程中随机选取一线程,赋予其对象锁,唤醒线程,继续执行。这样就提供了在线程间同步、唤醒的操作。Thread.sleep()与Object.wait()二者都可以暂停当前线程,释放CPU控制权,主要的区别在于Object.wait()在释放CPU同时,释放了对象锁的控制。

优化后程序:

public class ThreadTest1 {
	
	//产品
	static class ProductObject{
		//线程操作变量可见
		public  static String value;
	}
	
	//生产者线程
	static class Producer extends Thread{
		Object lock;
		
		public Producer(Object lock) {
			this.lock = lock;
		}
		
		@Override
		public void run() {
			//不断生产产品
			while(true){
				synchronized (lock) { //互斥锁
					//产品还没有被消费,等待
					if(ProductObject.value != null){
						try {
							lock.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
					//产品已经消费完成,生产新的产品
					ProductObject.value = "NO:"+System.currentTimeMillis();
					System.out.println("生产产品:"+ProductObject.value);
					lock.notify(); //生产完成,通知消费者消费
				}
			}
			
		}
	}
	
	//消费者线程
	static class Consumer extends Thread{
		Object lock;
		public Consumer(Object lock) {
			this.lock = lock;
		}
		
		@Override
		public void run() {
			while(true){
				synchronized (lock) {
					//没有产品可以消费
					if(ProductObject.value == null){
						//等待,阻塞
						try {
							lock.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
					System.out.println("消费产品:"+ProductObject.value);
					ProductObject.value = null;
					lock.notify(); //消费完成,通知生产者,继续生产
				}
			}
		}
	}

	public static void main(String[] args) {
		Object lock = new Object();
		
		new Producer(lock).start();
		
		new Consumer(lock).start();
	}
	
}

复制代码

######(4)volatile与synchronized区别

1)volatile本质是在告诉jvm当前变量在寄存器中的值是不确定的,需要从主存中读取,synchronized则是锁定当前变量,只有当前线程可以访问该变量,其他线程被阻塞住. 2)volatile仅能使用在变量级别,synchronized则可以使用在变量,方法. 3)volatile仅能实现变量的修改可见性,而synchronized则可以保证变量的修改可见性和原子性.

《Java编程思想》上说,定义long或double变量时,如果使用volatile关键字,就会获得(简单的赋值与返回操作)原子性 4)volatile不会造成线程的阻塞,而synchronized可能会造成线程的阻塞.

5)当一个域的值依赖于它之前的值时,volatile就无法工作了,如n=n+1,n++等。如果某个域的值受到其他域的值的限制,那么volatile也无法工作,如Range类的lower和upper边界,必须遵循lower<=upper的限制。

6)使用volatile而不是synchronized的唯一安全的情况是类中只有一个可变的域。

异步任务的执行的结果,主线程是无法获取

####二、Java中的FutureTask FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。

  1. FutureTask执行多任务计算的使用场景 利用FutureTask和ExecutorService,可以用多线程的方式提交计算任务,主线程继续执行其他任务,当主线程需要子线程的计算结果时,在异步获取子线程的执行结果。
public class FutureTaskForMultiCompute {

    public static void main(String[] args) {

        FutureTaskForMultiCompute inst=new FutureTaskForMultiCompute();
        // 创建任务集合
        List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();
        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            // 传入Callable对象创建FutureTask对象
            FutureTask<Integer> ft = new FutureTask<Integer>(inst.new ComputeTask( ""+i));
            taskList.add(ft);
            // 提交给线程池执行任务,也可以通过exec.invokeAll(taskList)一次性提交所有任务;
            executor.submit(ft);
        }

        System.out.println("所有计算任务提交完毕, 主线程接着干其他事情!");

        // 开始统计各计算线程计算结果
        Integer totalResult = 0;
        for (FutureTask<Integer> ft : taskList) {
            try {
                System.out.println("子线程返回值:"+ ft.get());
                //FutureTask的get方法会自动阻塞,直到获取计算结果为止
                totalResult = totalResult + ft.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        // 关闭线程池
        executor.shutdown();
        System.out.println("-----------多任务计算后的总结果是:" + totalResult);

    }

    private class ComputeTask implements Callable<Integer> {

        private int result = 0;
        private String taskName = "";

        public ComputeTask( String taskName){
            this.taskName = taskName;
            System.out.println("生成子线程计算任务: "+taskName);

        }

        public String getTaskName(){
            return this.taskName;
        }

        @Override
        public Integer call() throws Exception {

            for (int i = 0; i < 5; i++) {
                result += i;
            }
            // 休眠5秒钟,观察主线程行为,预期的结果是主线程会继续执行,到要取得FutureTask的结果是等待直至完成。
            Thread.sleep(5000);
            System.out.println("该子线程名: "+Thread.currentThread().getName() );
            System.out.println("子线程计算任务: "+taskName+" 执行完成!");
            return result;
        }
    }
}

结果输出:
生成子线程计算任务: 0
生成子线程计算任务: 1
生成子线程计算任务: 2
生成子线程计算任务: 3
生成子线程计算任务: 4
生成子线程计算任务: 5
生成子线程计算任务: 6
生成子线程计算任务: 7
生成子线程计算任务: 8
生成子线程计算任务: 9
所有计算任务提交完毕, 主线程接着干其他事情!
该子线程名: pool-1-thread-3
子线程计算任务: 2 执行完成!
该子线程名: pool-1-thread-1
子线程计算任务: 0 执行完成!
该子线程名: pool-1-thread-5
子线程计算任务: 4 执行完成!
该子线程名: pool-1-thread-2
子线程计算任务: 1 执行完成!
该子线程名: pool-1-thread-4
子线程计算任务: 3 执行完成!
子线程返回值:10
子线程返回值:10
子线程返回值:10
子线程返回值:10
子线程返回值:10
该子线程名: pool-1-thread-2
子线程计算任务: 8 执行完成!
该子线程名: pool-1-thread-5
子线程计算任务: 7 执行完成!
该子线程名: pool-1-thread-4
子线程计算任务: 9 执行完成!
该子线程名: pool-1-thread-3
子线程计算任务: 5 执行完成!
该子线程名: pool-1-thread-1
子线程计算任务: 6 执行完成!
子线程返回值:10
子线程返回值:10
子线程返回值:10
子线程返回值:10
子线程返回值:10
-----------多任务计算后的总结果是:100

复制代码

#####可以看到,同一时刻能够运行的线程数为5个。也就是说当我们启动了10个任务时,只有5个任务能够立刻执行,另外的5个任务则需要等待,当有一个任务执行完毕后,第6个任务才会启动,以此类推

  1. FutureTask在高并发环境下确保任务只执行一次 在很多高并发的环境下,往往我们只需要某些任务只执行一次。这种使用情景FutureTask的特性恰能胜任。举一个例子,假设有一个带key的连接池,当key存在时,即直接返回key对应的对象;当key不存在时,则创建连接。对于这样的应用场景,通常采用的方法为使用一个Map对象来存储key和连接池对应的对应关系,典型的代码如下面所示:
    private Map<String, Connection> connectionPool = new HashMap<String, Connection>();
    private ReentrantLock lock = new ReentrantLock();
    
    public Connection getConnection(String key){
        try{
            lock.lock();
            if(connectionPool.containsKey(key)){
                return connectionPool.get(key);
            }
            else{
                //创建 Connection
                Connection conn = createConnection();
                connectionPool.put(key, conn);
                return conn;
            }
        }
        finally{
            lock.unlock();
        }
    }
    
    //创建Connection
    private Connection createConnection(){
        return null;
    }
复制代码

在上面的例子中,我们通过加锁确保高并发环境下的线程安全,也确保了connection只创建一次,然而确牺牲了性能。改用ConcurrentHash的情况下,几乎可以避免加锁的操作,性能大大提高,但是在高并发的情况下有可能出现Connection被创建多次的现象。这时最需要解决的问题就是当key不存在时,创建Connection的动作能放在connectionPool之后执行,这正是FutureTask发挥作用的时机,基于ConcurrentHashMap和FutureTask的改造代码如下:

    private ConcurrentHashMap<String,FutureTask<Connection>>connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();
    
    public Connection getConnection(String key) throws Exception{
        FutureTask<Connection>connectionTask=connectionPool.get(key);
        if(connectionTask!=null){
            return connectionTask.get();
        }
        else{
            Callable<Connection> callable = new Callable<Connection>(){
                @Override
                public Connection call() throws Exception {
                    // TODO Auto-generated method stub
                    return createConnection();
                }
            };
            FutureTask<Connection>newTask = new FutureTask<Connection>(callable);
            connectionTask = connectionPool.putIfAbsent(key, newTask);
            if(connectionTask==null){
                connectionTask = newTask;
                connectionTask.run();
            }
            return connectionTask.get();
        }
    }
    
    //创建Connection
    private Connection createConnection(){
        return null;
    }
复制代码

Java FutureTask 异步任务操作提供了便利性 1.获取异步任务的返回值 2.监听异步任务的执行完毕 3.取消异步任务 ####三、Android中的AsyncTask

AsyncTask源码

public abstract class AsyncTask<Params, Progress, Result> {
    private static final String LOG_TAG = "AsyncTask”;
    //cpu核心数    
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    //核心线程数的区间是[2,4]
    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    //线程池最大容量
    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
    //当一个线程空闲30秒后就会被取消
    private static final int KEEP_ALIVE_SECONDS = 30;
    //线程工厂 通过工厂方法newThread来创建新的线程
    private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    //原子整数 可以在高并发下正常工作
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
        }
    };

    //静态阻塞式队列,用来存放待执行的任务,初始容量:128个  
    private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);

    //静态并发线程池,可以用来并行执行任务,3.0开始,AsyncTask默认是串行执行任务,我们可以构造并行的AsyncTask
    public static final Executor THREAD_POOL_EXECUTOR;
    static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }
    //静态串行的任务执行器,内部实现了线程控制,循环的一个个取出任务交给并发线程池去执行
    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
    //消息类型 结果
    private static final int MESSAGE_POST_RESULT = 0x1;
    //消息类型 进度
    private static final int MESSAGE_POST_PROGRESS = 0x2;
    //默认的任务执行器,这里使用的是串行的任务执行器,所以AsyncTask是串行的
    private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
    //静态的Handler,AsyncTask必须在UI线程中执行是因为Handler用的是UI线程的Looper,子线程没有Looper
    private static InternalHandler sHandler;

    private final WorkerRunnable<Params, Result> mWorker;
    private final FutureTask<Result> mFuture;

    //任务状态 默认为挂起 标识为易变的volatile
    private volatile Status mStatus = Status.PENDING;
    //原子布尔型 高并发支持 任务是否被取消
    private final AtomicBoolean mCancelled = new AtomicBoolean();
    //任务是否贝执行过
    private final AtomicBoolean mTaskInvoked = new AtomicBoolean();

   //串行的任务执行器,当asyncstask执行的时候会加入到任务队列中一个个执行
    private static class SerialExecutor implements Executor {
      //线性的双向队列 用来存储所有的AsyncTask任务
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        //当前正在执行的任务
        Runnable mActive;
        //将新的任务加入到双向队列中
        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        //执行任务
                        r.run();
                    } finally {
                        //如果还有任务,则上一个任务执行完毕后执行下一个任务
                        scheduleNext();
                    }
                }
            });
            //当前任务为空 则进入下一个任务
            if (mActive == null) {
                scheduleNext();
            }
        }
        
        //从任务栈的头部取出任务,交给并发线程池执行任务
        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }

    //任务的状态  等待执行,正在执行,执行完成
    public enum Status {
        PENDING,
        RUNNING,
        FINISHED,
    }

    //同步锁 初始化Handler
    private static Handler getHandler() {
        synchronized (AsyncTask.class) {
            if (sHandler == null) {
                sHandler = new InternalHandler();
            }
            return sHandler;
        }
    }

    /** @hide */
    //隐藏的类 设置默认线程执行器
    public static void setDefaultExecutor(Executor exec) {
        sDefaultExecutor = exec;
    }

   //AsyncTask的构造函数
    public AsyncTask() {
        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                //...
                //result = doInBackground(mParams);
                //...
                return result;
            }
        };

        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                //...
            }
        };
    }

    private void postResultIfNotInvoked(Result result) {
        final boolean wasTaskInvoked = mTaskInvoked.get();
        if (!wasTaskInvoked) {
            postResult(result);
        }
    }

    //执行完毕发送消息
    private Result postResult(Result result) {
        @SuppressWarnings("unchecked")
        Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                new AsyncTaskResult<Result>(this, result));
        message.sendToTarget();
        return result;
    }
    //返回当前任务状态
    public final Status getStatus() {
        return mStatus;
    }
    //抽象类  在子线程中执行
    @WorkerThread
    protected abstract Result doInBackground(Params... params);

    //在Execute之前执行 
    @MainThread
    protected void onPreExecute() {
    }

    //任务完毕 返回结果 
    @MainThread
    protected void onPostExecute(Result result) {
    }
    
    //更新任务进度
    @MainThread
    protected void onProgressUpdate(Progress... values) {
    }
    //Cancel被调用并且doInBackground执行完毕,onCancelled被调用,表示任务取消,onPostExecute不会被调用
    @MainThread
    protected void onCancelled(Result result) {
        onCancelled();
    }    
    
    @MainThread
    protected void onCancelled() {
    }

    public final boolean isCancelled() {
        return mCancelled.get();
    }
  
    //取消正在执行的任务
    public final boolean cancel(boolean mayInterruptIfRunning) {
        mCancelled.set(true);
        return mFuture.cancel(mayInterruptIfRunning);
    }

    public final Result get() throws InterruptedException, ExecutionException {
        return mFuture.get();
    }

    //sDefaultExecutor默认串行执行器 如果我们要改成并发的执行方式直接使用executeOnExecutor这个方法
    @MainThread
    public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
    }
    //可以指定执行器
    @MainThread
    public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
            Params... params) {
        if (mStatus != Status.PENDING) {
            switch (mStatus) {
                case RUNNING:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task is already running.");
                case FINISHED:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task has already been executed "
                            + "(a task can be executed only once)");
            }
        }
        mStatus = Status.RUNNING;
        onPreExecute();
        mWorker.mParams = params;
        exec.execute(mFuture);
        return this;
    }

    //更新任务进度  onProgressUpdate会被调用
    @WorkerThread
    protected final void publishProgress(Progress... values) {
        if (!isCancelled()) {
            getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
                    new AsyncTaskResult<Progress>(this, values)).sendToTarget();
        }
    }

    //任务执行完毕  如果没有被取消执行onPostExecute()方法
    private void finish(Result result) {
        if (isCancelled()) {
            onCancelled(result);
        } else {
            onPostExecute(result);
        }
        mStatus = Status.FINISHED;
    }
    //AsyncTask内部Handler
    private static class InternalHandler extends Handler {
        public InternalHandler() {
            super(Looper.getMainLooper());
        }
        @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
            switch (msg.what) {
                case MESSAGE_POST_RESULT:
                    // There is only one result
                    result.mTask.finish(result.mData[0]);
                    break;
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
    }

    private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
        Params[] mParams;
    }
    @SuppressWarnings({"RawUseOfParameterizedType"})
    private static class AsyncTaskResult<Data> {
        final AsyncTask mTask;
        final Data[] mData;
        AsyncTaskResult(AsyncTask task, Data... data) {
            mTask = task;
            mData = data;
        }
    }
}


复制代码

关键源码:

   private static Handler getHandler() {
        synchronized (AsyncTask.class) {
            if (sHandler == null) {
                sHandler = new InternalHandler();
            }
            return sHandler;
        }
    }

    /** @hide */
    public static void setDefaultExecutor(Executor exec) {
        sDefaultExecutor = exec;
    }

    /**
     * Creates a new asynchronous task. This constructor must be invoked on the UI thread.
     */
    public AsyncTask() {
        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);
                Result result = null;
                try {
                    Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                    //noinspection unchecked
                    result = doInBackground(mParams);
                    Binder.flushPendingCommands();
                } catch (Throwable tr) {
                    mCancelled.set(true);
                    throw tr;
                } finally {
                    postResult(result);
                }
                return result;
            }
        };

        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }
复制代码

AsyncTask源码我们发现,它其实是内部封装了Thead、FutureTask和Handler。

#####问题一:线程池容量不够抛出异常

public class AsyncTaskTest {


    public static void main(String[] args) {
        int CPU_COUNT = Runtime.getRuntime().availableProcessors(); //可用的CPU个数
        int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));//核心线程数
        int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;//9  最大线程数量
        int KEEP_ALIVE_SECONDS = 1;//闲置回收时间
        final BlockingDeque<Runnable> sPoolWorkQueue = new LinkedBlockingDeque<Runnable>(128);//异步任务队列

        // sThreadFactory:线程工厂
        final ThreadFactory sThreadFactory = new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);

            public Thread newThread(Runnable r) {
                String name = "Thread #" + mCount.getAndIncrement();
                System.out.println(name);
                return new Thread(r, name);
            }
        };

        //线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);


        //执行异步任务
        for(int i =0;i < 200;i++){
            //相当于new AsyncTask().execute();
            threadPoolExecutor.execute(new MyTask());
        }
    }
    static class MyTask implements Runnable{
        @Override
        public void run() {
            while(true){
                try {
                    System.out.println(Thread.currentThread().getName());
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
结果输出:
Thread #1
Thread #2
Thread #3
Thread #2
Thread #1
Thread #3
Thread #4
Thread #5
Thread #6
Thread #4
Thread #5
Thread #7
Thread #6
Thread #8
Thread #7
Thread #9
Thread #8
Thread #9
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.haocai.app.multithread.test.AsyncTaskTest$MyTask@6d6f6e28 rejected from [email protected][Running, pool size = 9, active threads = 9, queued tasks = 128, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at com.haocai.app.multithread.test.AsyncTaskTest.main(AsyncTaskTest.java:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Thread #4
Thread #1
......省略......
复制代码

###我们发现会出现异常java.util.concurrent.RejectedExecutionException

如果当前线程池中的数量小于corePoolSize,创建并添加的任务。 如果当前线程池中的数量等于corePoolSize,缓冲队列 workQueue未满,那么任务被放入缓冲队列、等待任务调度执行。 如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量小于maximumPoolSize,新提交任务会创建新线程执行任务。 如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量等于maximumPoolSize,新提交任务由Handler处理。 当线程池中的线程大于corePoolSize时,多余线程空闲时间超过keepAliveTime时,会关闭这部分线程。

#####解决:线程池扩容

 //自定义线程池
 Executor executor = Executors.newScheduledThreadPool(25);//指定核心线程池数量
复制代码

#####问题二:线程阻塞 AsyncTask里面维护着两个线程池,THREAD_POOL_EXECUTOR和SERIAL_EXECUTOR,其中SERIAL_EXECUTOR是默认的线程池

再来看api22 SerialExecutor 的源码

    private static class SerialExecutor implements Executor {
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;

        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }
复制代码

通过上面的源码可以发现,每次执行完一个任务后,才会调用scheduleNext往线程池里面添加任务,所以即使线程池是并行的,但是我添加任务的时候是串行的,所以api22中的AsyncTask是串行的,那么线程池其实再多的线程也没用了,反正每次都只有一个任务在里面。

######而且由于SERIAL_EXECUTOR被声明为static,所以,同一个进程里的AsyncTask都会共享这个线程池,这就意味着,在同一个进程里,前面的线程不结束,后面的线程就会被挂起。

#####解决: 所以,使用AsyncTask执行任务的时候,请使用AsyncTask.executeOnExecutor(THREAD_POOL_EXECUTOR)来让你的任务跑在并行的线程池上,避免出现并前面线程阻塞的情况。当然,如果你的CPU核心数够多,2到4个线程的并行度不满足的话,也可以自定义一个线程池来执行AsyncTask,不过这样的话,要注意自己维护这个线程池的初始化,释放等等操作了。

new AsyncTask<Void, Void, Void>(){
            @Override
            protected Void doInBackground(Void... params) {
                return null;
            }
        }.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);

复制代码

#####注意:如果你确定自己做好了同步处理,或者你没有在不同的AsyncTask里面访问共享资源,需要AsyncTask能够并行处理任务的话,你可以用带有两个参数的executeOnExecutor执行任务

#####Android AsyncTask版本问题 ######1.5刚开始引入AsyncTask的时候,execute方法确实是串行执行的,类定义里面只有SERIAL_EXECUTOR线程池; ######1.6版本时,改用并行线程池THREAD_POOL_EXECUTOR, ######3.0版本至今,就成了上面说的模样————定义两个线程池,但是默认用串行池。 #####问题三:内存泄露

public class MainActivity extends AppCompatActivity {

    private MyTask task;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        //使用默认线程池
        task = new MyTask();
        task.execute();


    }



    class MyTask extends AsyncTask<Void, Integer, Void> {
        int i;

        @Override
        protected Void doInBackground(Void... params) {

            Log.d("main", String.valueOf(i++));
            SystemClock.sleep(1000);
          
            return null;
        }
    }
}

复制代码

当Activity finish() 之后,观察到MyTask 还在执行,这样会造成内存泄漏

#####解决:

public class MainActivity extends AppCompatActivity {

    private MyTask task;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        //使用默认线程池
        task= new MyTask();
        task.execute();

    }


    @Override
    protected void onDestroy() {
        super.onDestroy();
        task.cancel(true);
    }

    class MyTask extends AsyncTask<Void, Integer, Void> {
        int i;

        @Override
        protected Void doInBackground(Void... params) {
            while(!isCancelled()){
                Log.d("main", String.valueOf(i++));
                SystemClock.sleep(1000);
            }

            return null;
        }
    }
}
复制代码

特别感谢: 动脑学院Jason 木易·月 linchunquan lmj121212