谈谈Java任务的并行处理
最近有很多伙伴给我留言说想了解一下java的并行处理,那今天我们就来说一说这个问题。
谈到并行,我们可能最先想到的是线程,多个线程一起运行,来提高我们系统的整体处理速度;为什么使用多个线程就能提高处理速度,因为现在计算机普遍都是多核处理器,我们需要充分利用cpu资源;如果站的更高一点来看,我们每台机器都可以是一个处理节点,多台机器并行处理;并行的处理方式可以说无处不在,本文主要来谈谈java在并行处理方面的努力。
无处不在的并行
java的垃圾回收器,我们可以看到每一代版本的更新,伴随着gc更短的延迟,从serial到cms再到现在的g1,一直在摘掉java慢的帽子;消息队列从早期的activemq到现在的kafka和rocketmq,引入的分区的概念,提高了消息的并行性;数据库单表数据到一定量级之后,访问速度会很慢,我们会对表进行分表处理,引入数据库中间件;redis你可能觉得本身处理是单线程的,但是redis的集群方案中引入了slot(槽)的概念;更普遍的就是我们很多的业务系统,通常会部署多台,通过负载均衡器来进行分发;好了还有其他的一些例子,此处不在一一例举。
如何并行
我觉得并行的核心在于"拆分",把大任务变成小任务,然后利用多核cpu也好,还是多节点也好,同时并行的处理,java历代版本的更新,都在为我们开发者提供更方便的并行处理,从开始的thread,到线程池,再到fork/join框架,最后到流处理,下面使用简单的求和例子来看看各种方式是如何并行处理的;
单线程处理
首先看一下最简单的单线程处理方式,直接使用主线程进行求和操作;
public class singlethread {
public static long[] numbers;
public static void main(string[] args) {
numbers = longstream.rangeclosed(1, 10_000_000).toarray();
long sum = 0;
for (int i = 0; i < numbers.length; i++) {
sum += numbers[i];
}
system.out.println("sum = " + sum);
}
}
求和本身是一个计算密集型任务,但是现在已经是多核时代,只用单线程,相当于只使用了其中一个cpu,其他cpu被闲置,资源的浪费;
thread方式
我们把任务拆分成多个小任务,然后每个小任务分别启动一个线程,如下所示:
public class threadtest {
public static final int threshold = 10_000;
public static long[] numbers;
private static long allsum;
public static void main(string[] args) throws exception {
numbers = longstream.rangeclosed(1, 10_000_000).toarray();
int tasksize = (int) (numbers.length / threshold);
for (int i = 1; i <= tasksize; i++) {
final int key = i;
new thread(new runnable() {
public void run() {
sumall(sum((key - 1) * threshold, key * threshold));
}
}).start();
}
thread.sleep(100);
system.out.println("allsum = " + getallsum());
}
private static synchronized long sumall(long threadsum) {
return allsum += threadsum;
}
public static synchronized long getallsum() {
return allsum;
}
private static long sum(int start, int end) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
以上指定了一个拆分阀值,计算拆分多少个认为,同时启动多少线程;这种处理就是启动的线程数过多,而cpu数有限,更重要的是求和是一个计算密集型任务,启动过多的线程只会带来更多的线程上下文切换;同时线程处理完一个任务就终止了,也是对资源的浪费;另外可以看到主线程不知道何时子任务已经处理完了,需要做额外的处理;所有java后续引入了线程池。
线程池方式
jdk1.5引入了并发包,其中包括了threadpoolexecutor,相关代码如下:
public class executorservicetest {
public static final int threshold = 10_000;
public static long[] numbers;
public static void main(string[] args) throws exception {
numbers = longstream.rangeclosed(1, 10_000_000).toarray();
executorservice executor = executors.newfixedthreadpool(runtime.getruntime().availableprocessors() + 1);
completionservice<long> completionservice = new executorcompletionservice<long>(executor);
int tasksize = (int) (numbers.length / threshold);
for (int i = 1; i <= tasksize; i++) {
final int key = i;
completionservice.submit(new callable<long>() {
@override
public long call() throws exception {
return sum((key - 1) * threshold, key * threshold);
}
});
}
long sumvalue = 0;
for (int i = 0; i < tasksize; i++) {
sumvalue += completionservice.take().get();
}
// 所有任务已经完成,关闭线程池
system.out.println("sumvalue = " + sumvalue);
executor.shutdown();
}
private static long sum(int start, int end) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
上面已经分析了计算密集型并不是线程越多越好,这里创建了jdk默认的线程数:cpu数+1,这是一个经过大量测试以后给出的一个结果;线程池顾名思义,可以重复利用现有的线程;同时利用completionservice来对子任务进行汇总;合理的使用线程池已经可以充分的并行处理任务,只是在写法上有点繁琐,此时jdk1.7中引入了fork/join框架;
fork/join框架
分支/合并框架的目的是以递归的方式将可以并行的认为拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果;相关代码如下:
public class forkjointest extends java.util.concurrent.recursivetask<long> {
private static final long serialversionuid = 1l;
private final long[] numbers;
private final int start;
private final int end;
public static final long threshold = 10_000;
public forkjointest(long[] numbers) {
this(numbers, 0, numbers.length);
}
private forkjointest(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@override
protected long compute() {
int length = end - start;
if (length <= threshold) {
return computesequentially();
}
forkjointest lefttask = new forkjointest(numbers, start, start + length / 2);
lefttask.fork();
forkjointest righttask = new forkjointest(numbers, start + length / 2, end);
long rightresult = righttask.compute();
// 注:join方法会阻塞,因此有必要在两个子任务的计算都开始之后才执行join方法
long leftresult = lefttask.join();
return leftresult + rightresult;
}
private long computesequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
public static void main(string[] args) {
system.out.println(forkjoinsum(10_000_000));
}
public static long forkjoinsum(long n) {
long[] numbers = longstream.rangeclosed(1, n).toarray();
forkjointask<long> task = new forkjointest(numbers);
return new forkjoinpool().invoke(task);
}
}
forkjoinpool是executorservice接口的一个实现,子认为分配给线程池中的工作线程;同时需要把任务提交到此线程池中,需要创建recursivetask<r>的一个子类;大体逻辑就是通过fork进行拆分,然后通过join进行结果的合并,jdk为我们提供了一个框架,我们只需要在里面填充即可,更加方便;有没有更简单的方式,连拆分都省了,自动拆分合并,jdk在1.8中引入了流的概念;
流方式
java8引入了stream的概念,可以让我们更好的利用并行,使用流代码如下:
public class streamtest {
public static void main(string[] args) {
system.out.println("sum = " + parallelrangedsum(10_000_000));
}
public static long parallelrangedsum(long n) {
return longstream.rangeclosed(1, n).parallel().reduce(0l, long::sum);
}
}
以上代码是不是非常简单,对于开发者来说完全不需要手动拆分,使用同步机制等方式,就可以让任务并行处理,只需要对流使用parallel()方法,系统自动会对任务进行拆分,当然前提是没有共享可变状态;其实并行流内部使用的也是fork/join框架;
总结
本文使用一个求和的实例,来介绍了jdk为开发者提供并行处理的各种方式,可以看到java一直在为提供更方便的并行处理而努力。更多的java知识添加我的扣峮前108中062后1881.
上一篇: Python爬取小说《斗罗大陆》
下一篇: 实现微信向多好友定时发送信息