java并发编程_线程池的使用方法(详解)
一、任务和执行策略之间的隐性耦合
executor可以将任务的提交和任务的执行策略解耦
只有任务是同类型的且执行时间差别不大,才能发挥最大性能,否则,如将一些耗时长的任务和耗时短的任务放在一个线程池,除非线程池很大,否则会造成死锁等问题
1.线程饥饿死锁
类似于:将两个任务提交给一个单线程池,且两个任务之间相互依赖,一个任务等待另一个任务,则会发生死锁;表现为池不够
定义:某个任务必须等待池中其他任务的运行结果,有可能发生饥饿死锁
2.线程池大小
注意:线程池的大小还受其他的限制,如其他资源池:数据库连接池
如果每个任务都是一个连接,那么线程池的大小就受制于数据库连接池的大小
3.配置threadpoolexecutor线程池
实例:
1.通过executors的工厂方法返回默认的一些实现
2.通过实例化threadpoolexecutor(.....)自定义实现
线程池的队列
1.*队列:任务到达,线程池饱满,则任务在队列中等待,如果任务无限达到,则队列会无限扩张
如:单例和固定大小的线程池用的就是此种
2.有界队列:如果新任务到达,队列满则使用饱和策略
3.同步移交:如果线程池很大,将任务放入队列后在移交就会产生延时,如果任务生产者很快也会导致任务排队
synchronousqueue直接将任务移交给工作线程
机制:将一个任务放入,必须有一个线程等待接受,如果没有,则新增线程,如果线程饱和,则拒绝任务
如:cachethreadpool就是使用的这种策略
饱和策略:
setrejectedexecutionhandler来修改饱和策略
1.终止abort(默认):抛出异常由调用者处理
2.抛弃discard
3.抛弃discardoldest:抛弃最旧的任务,注意:如果是优先级队列将抛弃优先级最高的任务
4.callerruns:回退任务,有调用者线程自行处理
4.线程工厂threadfactoy
每当创建线程时:其实是调用了线程工厂来完成
自定义线程工厂:implements threadfactory
可以定制该线程工厂的行为:如uncaughtexceptionhandler等
public class myappthread extends thread { public static final string default_name = "myappthread"; private static volatile boolean debuglifecycle = false; private static final atomicinteger created = new atomicinteger(); private static final atomicinteger alive = new atomicinteger(); private static final logger log = logger.getanonymouslogger(); public myappthread(runnable r) { this(r, default_name); } public myappthread(runnable runnable, string name) { super(runnable, name + "-" + created.incrementandget()); //设置该线程工厂创建的线程的 未捕获异常的行为 setuncaughtexceptionhandler(new thread.uncaughtexceptionhandler() { public void uncaughtexception(thread t, throwable e) { log.log(level.severe, "uncaught in thread " + t.getname(), e); } }); } public void run() { // copy debug flag to ensure consistent value throughout. boolean debug = debuglifecycle; if (debug) log.log(level.fine, "created " + getname()); try { alive.incrementandget(); super.run(); } finally { alive.decrementandget(); if (debug) log.log(level.fine, "exiting " + getname()); } } public static int getthreadscreated() { return created.get(); } public static int getthreadsalive() { return alive.get(); } public static boolean getdebug() { return debuglifecycle; } public static void setdebug(boolean b) { debuglifecycle = b; } }
5.扩展threadpoolexecutor
可以被自定义子类覆盖的方法:
1.afterexecute:结束后,如果抛出runtimeexception则方法不会执行
2.beforeexecute:开始前,如果抛出runtimeexception则任务不会执行
3.terminated:在线程池关闭时,可以用来释放资源等
二、递归算法的并行化
1.循环
在循环中,每次循环操作都是独立的
//串行化 void processsequentially(list<element> elements) { for (element e : elements) process(e); } //并行化 void processinparallel(executor exec, list<element> elements) { for (final element e : elements) exec.execute(new runnable() { public void run() { process(e); } }); }
2.迭代
如果每个迭代操作是彼此独立的,则可以串行执行
如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的
//串行 计算compute 和串行迭代 public <t> void sequentialrecursive(list<node<t>> nodes, collection<t> results) { for (node<t> n : nodes) { results.add(n.compute()); sequentialrecursive(n.getchildren(), results); } } //并行 计算compute 和串行迭代 public <t> void parallelrecursive(final executor exec, list<node<t>> nodes, final collection<t> results) { for (final node<t> n : nodes) { exec.execute(() -> results.add(n.compute())); parallelrecursive(exec, n.getchildren(), results); } } //调用并行方法的操作 public <t> collection<t> getparallelresults(list<node<t>> nodes) throws interruptedexception { executorservice exec = executors.newcachedthreadpool(); queue<t> resultqueue = new concurrentlinkedqueue<t>(); parallelrecursive(exec, nodes, resultqueue); exec.shutdown(); exec.awaittermination(long.max_value, timeunit.seconds); return resultqueue; }
实例:
public class concurrentpuzzlesolver <p, m> { private final puzzle<p, m> puzzle; private final executorservice exec; private final concurrentmap<p, boolean> seen; protected final valuelatch<puzzlenode<p, m>> solution = new valuelatch<puzzlenode<p, m>>(); public concurrentpuzzlesolver(puzzle<p, m> puzzle) { this.puzzle = puzzle; this.exec = initthreadpool(); this.seen = new concurrenthashmap<p, boolean>(); if (exec instanceof threadpoolexecutor) { threadpoolexecutor tpe = (threadpoolexecutor) exec; tpe.setrejectedexecutionhandler(new threadpoolexecutor.discardpolicy()); } } private executorservice initthreadpool() { return executors.newcachedthreadpool(); } public list<m> solve() throws interruptedexception { try { p p = puzzle.initialposition(); exec.execute(newtask(p, null, null)); // 等待valuelatch中闭锁解开,则表示已经找到答案 puzzlenode<p, m> solnpuzzlenode = solution.getvalue(); return (solnpuzzlenode == null) ? null : solnpuzzlenode.asmovelist(); } finally { exec.shutdown();//最终主线程关闭线程池 } } protected runnable newtask(p p, m m, puzzlenode<p, m> n) { return new solvertask(p, m, n); } protected class solvertask extends puzzlenode<p, m> implements runnable { solvertask(p pos, m move, puzzlenode<p, m> prev) { super(pos, move, prev); } public void run() { //如果有一个线程找到了答案,则return,通过valuelatch中isset countdownlatch闭锁实现; //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环 if (solution.isset() || seen.putifabsent(pos, true) != null){ return; // already solved or seen this position } if (puzzle.isgoal(pos)) { solution.setvalue(this); } else { for (m m : puzzle.legalmoves(pos)) exec.execute(newtask(puzzle.move(pos, m), m, this)); } } } }
以上这篇java并发编程_线程池的使用方法(详解)就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。