处理数据你还用单线程?OUT了老兄!赶快来试试多线程吧!
场景
最近实习时遇到这个一个场景,需要使用定时任务扫描出需要处理的数据,然后执行相应的操作。显然,我这个菜鸡一开始的时候采用了单线程模型,可惜效率实在过低,过长的等待时间令我无法忍受。很巧,我想起了之前学过的并发编程知识,决定把单线程模型改为多线程模型,来提高我们处理数据的效率。结果也如我们所愿,效率大为提高,真棒!
我把我的场景抽象一下,假设现在我们有一个队列,需要对队列的数据进行处理,那么单线程与多线程处理数据到底有啥区别呢?
单线程模型
import java.util.ArrayList;
import java.util.List;
class Num {
private List<Integer> list = new ArrayList();
public Num() {
for(int i = 0;i < 100;i++) {
list.add(i);
}
}
public List<Integer> getList() {
return list;
}
}
// 单线程
public class SingleThread {
public static void main(String[] args) throws InterruptedException {
Num num = new Num();
List<Integer> list = num.getList();
long startTime = System.currentTimeMillis();
for(Integer i : list) {
// 模拟处理任务
Thread.sleep(100);
}
long endTime = System.currentTimeMillis();
System.out.println("处理的总时长:" + (endTime - startTime));
}
}
这个没啥好说的,在 main 中直接遍历队列将数据处理完毕了,简单,不过简单带来的代价就是等待时间过长,不信?你看看执行时间:
好家伙,我直接好家伙!这执行时间太长了,谁顶得住啊!那有没有效率更高的方法呢?其实是有的,下面我们来看看多线程的处理方式。
多线程模型(一)
我们构建一个线程池,初始化十个线程来执行我们的任务。为避免加锁,我们把一个大队列拆分为十个小队列,这样我们在处理数据的过程中天然是线程安全的。CountDownLatch 可以让我们在十个线程全部执行成功之后再打印总的执行时间。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class Num1 {
private List<Integer> list = new ArrayList();
public Num1() {
for(int i = 0;i < 100;i++) {
list.add(i);
}
}
public List<Integer> getList() {
return list;
}
}
class MyThread implements Runnable {
private List<Integer> list;
private CountDownLatch countDownLatch;
public MyThread(List<Integer> list, CountDownLatch countDownLatch) {
this.list = list;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
for(Integer i : list) {
try {
// 模拟相关操作
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
}
}
// 多线程(每个线程处理相同数量的任务)
public class multiThread1 {
public static void main(String[] args) throws InterruptedException {
long startTime = System.currentTimeMillis();
Num1 num1 = new Num1();
List<Integer> list = num1.getList();
int threadNum = 10;
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
int perSize = list.size() / threadNum;
for (int i = 0; i < threadNum; i++) {
MyThread thread = new MyThread(list.subList(i * perSize, (i + 1) * perSize), countDownLatch);
executorService.submit(thread);
}
countDownLatch.await();
executorService.shutdown();
long endTime = System.currentTimeMillis();
System.out.println("处理的总时长:" + (endTime - startTime));
}
}
这执行时间连我都不敢相信!跟单线程模型一对比执行时间,孰优孰劣就不用我多说了吧!
多线程模型(二)
上一种方法看起来非常完美,不过我们上面的方法直接将数据分成十等份,在每一份数据的数据量相同的情况下固然是好,但如果某一份数据的处理时间特别长,在其他线程都处理完毕后,处理这份数据的线程还在执行,最后就又变回单线程了!
在数据分布不均匀的情况下,我们可以改进下我们的模型,不要在线程处理数据之前直接把数据分配给线程,而是应该在线程处理数据的过程中动态分配数据,线程处理完一个,我们就再给这个线程分配下一个数据。在这种情况下,我们需要加锁,不然很可能会出现并发导致的线程不安全问题。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
class Num2 {
private List<Integer> list = new ArrayList();
public Num2() {
for(int i = 0;i < 100;i++) {
list.add(i);
}
}
public List<Integer> getList() {
return list;
}
}
class MyThread2 implements Runnable {
private List<Integer> list;
private AtomicInteger i;
private CountDownLatch countDownLatch;
public MyThread2(List<Integer> list, CountDownLatch countDownLatch, AtomicInteger i) {
this.list = list;
this.i = i;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
while(i.get() < list.size()) {
int index = i.getAndIncrement();
if(index >= list.size()) {
break ;
}
try {
// 模拟业务处理
list.get(index);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDownLatch.countDown();
}
}
// 多线程(每个线程处理不同数量的任务)
public class multiThread2 {
public static void main(String[] args) throws InterruptedException {
long startTime = System.currentTimeMillis();
Num2 num2 = new Num2();
List<Integer> list = num2.getList();
int threadNum = 10;
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
AtomicInteger atomicInteger = new AtomicInteger(0);
for (int i = 0; i < threadNum; i++) {
MyThread2 thread = new MyThread2(list, countDownLatch, atomicInteger);
executorService.submit(thread);
}
countDownLatch.await();
executorService.shutdown();
long endTime = System.currentTimeMillis();
System.out.println("处理的总时长:" + (endTime - startTime));
}
}
在每份数据的量大体相同的情况下,由于存在锁的开销,该模型比不上上一个模型;但如果数据分布极度不均匀的情况下,使用该方法要比使用第一种方法的效率更高,正所谓没有最好的方法,只有最适合的方法。
本文地址:https://blog.csdn.net/Geffin/article/details/110958580