Java7之forkjoin简介_动力节点Java学院整理
java7引入了fork join的概念,来更好的支持并行运算。顾名思义,fork join类似与流程语言的分支,合并的概念。也就是说java7 se原生支持了在一个主线程中开辟多个分支线程,并且根据分支线程的逻辑来等待(或者不等待)汇集,当然你也可以fork的某一个分支线程中再开辟fork join,这也就可以实现fork join的嵌套。
有两个核心类forkjoinpool和forkjointask。
forkjoinpool实现了executorservice接口,起到线程池的作用。所以他的用法和executor框架的使用时一样的,当然fork join本身就是executor框架的扩展。forkjoinpool有3个关键的方法,来启动线程,execute(…),invoke(…),submit(…)。具体描述如下:
|
<span style='font-size: 9pt;"微软雅黑","sans-serif"; color: #333333;"border-top: windowtext 1pt solid; border-right: windowtext 1pt solid; border-bottom: windowtext 1pt solid; padding-bottom: 0cm; padding-top: 0cm; padding-left: 0cm; border-left: windowtext 1pt solid; padding-right: 0cm; background-color: transparent;"> <p style="background: white; text-align: left; line-height: normal;" align=left><span style='font-size: 9pt;"微软雅黑","sans-serif"; color: #333333; |
首先,用户需要创建一个自己的forkjointask。代码如下:
public class myforkjointask extends forkjointask { /** * */ private static final long serialversionuid = 1l; private v value; private boolean success = false; @override public v getrawresult() { return value; } @override protected void setrawresult(v value) { this.value = value; } @override protected boolean exec() { system.out.println("exec"); return this.success; } public boolean issuccess() { return success; } public void setsuccess(boolean issuccess) { this.success = issuccess; } }
测试forkjoinpool.invoke(…):
@test public void testforkjoininvoke() throws interruptedexception, executionexception { forkjoinpool forkjoinpool = new forkjoinpool(); myforkjointask task = new myforkjointask(); task.setsuccess(true); task.setrawresult("test"); string invokeresult = forkjoinpool.invoke(task); assertequals(invokeresult, "test"); } @test public void testforkjoininvoke2() throws interruptedexception, executionexception { final forkjoinpool forkjoinpool = new forkjoinpool(); final myforkjointask task = new myforkjointask(); new thread(new runnable() { public void run() { try { thread.sleep(1000); } catch (interruptedexception e) { } task.complete("test"); } }).start(); // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...) string result = forkjoinpool.invoke(task); system.out.println(result); } @test public void testforkjoinsubmit() throws interruptedexception, executionexception { final forkjoinpool forkjoinpool = new forkjoinpool(); final myforkjointask task = new myforkjointask(); task.setsuccess(true); // 是否在此任务运行完毕后结束阻塞 forkjointask result = forkjoinpool.submit(task); result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete }
测试forkjoinpool.submit(…):
@test public void testforkjoinsubmit() throws interruptedexception, executionexception { final forkjoinpool forkjoinpool = new forkjoinpool(); final myforkjointask task = new myforkjointask(); task.setsuccess(true); // 是否在此任务运行完毕后结束阻塞 forkjointask result = forkjoinpool.submit(task); result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete } @test public void testforkjoinsubmit2() throws interruptedexception, executionexception { final forkjoinpool forkjoinpool = new forkjoinpool(); final myforkjointask task = new myforkjointask(); forkjoinpool.submit(task); thread.sleep(1000); } @test public void testforkjoinsubmit3() throws interruptedexception, executionexception { final forkjoinpool forkjoinpool = new forkjoinpool(); final myforkjointask task = new myforkjointask(); new thread(new runnable() { public void run() { try { thread.sleep(1000); } catch (interruptedexception e) { } task.complete("test"); } }).start(); forkjointask result = forkjoinpool.submit(task); // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...) result.get(); thread.sleep(1000); }
测试forkjoinpool.execute(…):
@test public void testforkjoinexecute() throws interruptedexception, executionexception { forkjoinpool forkjoinpool = new forkjoinpool(); myforkjointask task = new myforkjointask(); forkjoinpool.execute(task); // 异步执行,无视task.exec()返回值。 }
在实际情况中,很多时候我们都需要面对经典的“分治”问题。要解决这类问题,主要任务通常被分解为多个任务块(分解阶段),其后每一小块任务被独立并行计算。一旦计算任务完成,每一快的结果会被合并或者解决(解决阶段)。forkjointask天然就是为了支持“分治”问题的。
分支/合并的完整过程如下:
下面列举一个分治算法的实例。
import java.util.random; import java.util.concurrent.forkjoinpool; import java.util.concurrent.recursivetask; public class maximumfinder extends recursivetask<integer> { private static final int sequential_threshold = 5; private final int[] data; private final int start; private final int end; public maximumfinder(int[] data, int start, int end) { this.data = data; this.start = start; this.end = end; } public maximumfinder(int[] data) { this(data, 0, data.length); } @override protected integer compute() { final int length = end - start; if (length < sequential_threshold) { return computedirectly(); } final int split = length / 2; final maximumfinder left = new maximumfinder(data, start, start + split); left.fork(); final maximumfinder right = new maximumfinder(data, start + split, end); return math.max(right.compute(), left.join()); } private integer computedirectly() { system.out.println(thread.currentthread() + ' computing: ' + start + ' to ' + end); int max = integer.min_value; for (int i = start; i < end; i++) { if (data[i] > max) { max = data[i]; } } return max; } public static void main(string[] args) { // create a random data set final int[] data = new int[1000]; final random random = new random(); for (int i = 0; i < data.length; i++) { data[i] = random.nextint(100); } // submit the task to the pool final forkjoinpool pool = new forkjoinpool(4); final maximumfinder finder = new maximumfinder(data); system.out.println(pool.invoke(finder)); } }
以上所示是小编给大家介绍的java7之forkjoin简介_动力节点java学院整理,希望对大家有所帮助
推荐阅读
-
Java concurrency集合之ConcurrentSkipListSet_动力节点Java学院整理
-
Java concurrency集合之ConcurrentLinkedQueue_动力节点Java学院整理
-
web.xml中servlet, bean, filter, listenr 加载顺序_动力节点Java学院整理
-
Log4j详细使用教程_动力节点Java学院整理
-
Java日志相关技术_动力节点Java学院整理
-
Java中JDom解析XML_动力节点Java学院整理
-
Java listener简介_动力节点Java学院整理
-
Java线程安全的常用类_动力节点Java学院整理
-
Java反射机制详解_动力节点Java学院整理
-
Java concurrency之公平锁(二)_动力节点Java学院整理