性能优化14_多线程优化(编辑中)
一 生产者与消费者模式
public class ThreadTest {
//产品
static class ProductObject {
//线程操作变量可见
public volatile static String value;
}
//生产者线程
static class Producer extends Thread {
Object lock;
public Producer(Object lock) {
this.lock = lock;
}
@Override
public void run() {
//不断生产产品
while (true) {
synchronized (lock) {//互斥锁
//产品还没有被消费,等待
if (ProductObject.value != null) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//产品已经消费完成,生产新的产品
ProductObject.value = "NO:" + System.currentTimeMillis();
System.out.print("生产产品:" + ProductObject.value);
lock.notify();
}
}
}
}
//消费者线程
static class Consumer extends Thread {
Object lock;
public Consumer(Object lock){
this.lock = lock;
}
@Override
public void run() {
while (true){
synchronized (lock){
//没有产品可以消费
if(ProductObject.value == null){
//等待,阻塞
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print("消费产品:"+ProductObject.value);
ProductObject.value = null;
lock.notify();//消费完成,通知生产者继续生产
}
}
}
}
public static void main(String[] args){
Object lock = new Object();
new Producer(lock).start();
new Consumer(lock).start();
}
}
二 FutureTask 的便利性
异步任务执行的结果,主线程无法轻易的获取
1.获取异步任务的返回值
2.监听异步任务的执行完毕
3.取消异步任务
public static void main(String[] args) {
Task work = new Task();
FutureTask<Integer> future = new FutureTask<Integer>(work){
@Override
protected void done() {
try {
System.out.println("done:"+get());
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//线程池(使用了预定义的配置)
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(future);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//取消异步任务
future.cancel(true);
try {
//阻塞,等待异步任务执行完毕
System.out.println(future.get()); //获取异步任务的返回值
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
//异步任务
static class Task implements Callable<Integer>{
//返回异步任务的执行结果
@Override
public Integer call() {
int i = 0;
for(;i < 10;i++){
System.out.println(Thread.currentThread().getName()+"_"+i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return i;
}
}
}
三 AsyncTask的局限性
doBackground(call)
call的返回值在Future的done方法中获取
->onPostExecute
new MyTask().execute();
实例化:
new AsyncTask() -> new FutureTask()
执行:
Executor.execute(mFuture) -> SerialExecutor.myTasks(队列)
-> (线程池)THREAD_POOL_EXECUTOR.execute
线程池中的所有线程,为了执行异步任务
CORE_POOL_SIZE 核心线程数
MAXIMUM_POOL_SIZE 最大线程数量
KEEP_ALIVE 1s闲置回收
TimeUnit.SECONDS 时间单位
sPoolWorkQueue 异步任务队列
sThreadFactory 线程工厂
如果当前线程池中的数量小于corePoolSize,创建并添加的任务。
如果当前线程池中的数量等于corePoolSize,缓冲队列 workQueue未满,那么任务被放入缓冲队列、等待任务调度执行。
如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量小于maximumPoolSize,新提交任务会创建新线程执行任务。
如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量等于maximumPoolSize,新提交任务由Handler处理。
当线程池中的线程大于corePoolSize时,多余线程空闲时间超过keepAliveTime时,会关闭这部分线程。
public static final Executor THREAD_POOL_EXECUTOR
= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
1.线程池容量不够抛出异常
2.内存泄露
3.一个线程,一个异步任务(?)
public static void main(String[] args) {
int CPU_COUNT = Runtime.getRuntime().availableProcessors();
int CORE_POOL_SIZE = CPU_COUNT + 1;//5
int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; //9
int KEEP_ALIVE = 1;
//任务队列(128)
final BlockingDeque<Runnable> sPoolWorkQueue = new LinkedBlockingDeque<>(128);
//线程工厂
ThreadFactory sThreaFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
@Override
public Thread newThread(@NonNull Runnable r) {
String name = "Thread #" + mCount.getAndIncrement();
System.out.println(name);
return new Thread(r, name);
}
};
//线程池
Executor THREAD_POOL_EXECUTOR
= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
TimeUnit.SECONDS, sPoolWorkQueue, sThreaFactory);
//执行异步任务
//如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,
//并且线程池中的数量等于maximumPoolSize,新提交任务由Handler处理。
//RejectedExecutionException
for (int i = 0; i < 200; i++) {
//相当于new AsyncTask().execute();
THREAD_POOL_EXECUTOR.execute(new MyTask());
}
}
static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
/*while(true){
try {
System.out.println(Thread.currentThread().getName());
//Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}*/
}
}
上一篇: 基本的SQL代码
下一篇: 数据结构 栈 数据结构J#
推荐阅读