java多线程之CountDownLatch
程序员文章站
2024-03-13 20:54:10
...
今天主要简单说明一下,如何使用countdownlatch来完成多线程之间的协助操作,看一下具体的业务背景:
开发同事,需要对大量数据进行插入操作,因为数据量比较大,所以自然而然会想到用到多线程来提高效率,但是问题来了,就是因为一旦牵涉插入库操作,势必会牵涉到事务,本质需求就是,当多个线程进行插入操作,等所有线程执行完毕,一旦发现有一个或多个子线程在执行过车过程中出现了异常,就需要对事务进行全局回滚(也就是之前已经成功执行的线程,都有进行回滚)。其实要清楚这个问题的本质就比较简单了,那就是抛出异常的线程应该是主线程,回滚也要主线程去执行,下面看一下简单的代码实现,其实主要就是用了两个countdownlatch分别控制主线程和子线程的执行操作以及回滚时机.下面看一下伪代码
首先定义一个事务回滚类
public class DataRollBack {
public DataRollBack(boolean isRollBack) {
this.isRollBack = isRollBack;
}
//事务是否回滚 true回滚 false不回滚
public boolean isRollBack() {
return isRollBack;
}
public void setRollBack(boolean rollBack) {
isRollBack = rollBack;
}
private boolean isRollBack;
}
再定义一个维护事务管理类,主要包含事务的回滚操作和事务的提交操作
public class TransactionManager {
/**
* 模拟事务回滚
*/
public void rollBack(){
System.out.println("线程:"+Thread.currentThread().getName()+"执行事务回滚");
}
/**
* 模拟事务提交
*/
public void commit(){
System.out.println("线程:"+Thread.currentThread().getName()+"开始提交数据");
}
}
定义一个数据插入业务类,模拟数据插入操作
public class InsertService {
public boolean insert(String data){
System.out.println("开始保存数据到数据库中");
if(Thread.currentThread().getName().contains("Thread-4"))
return false;
return true;
}
}
定义实现数据插入的业务线程类:
package insert;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
/**
* @author aaa@qq.com
* @version 1.0
* @date 模拟多线程并发插入数据,且如果有一个线程出现了回滚,需要回滚整个线程(已经成功执行的线程需要全部回滚)
* @description
*/
public class DataInsertRunable implements Runnable {
//业务方法
private InsertService insertService;
//主线程
private CountDownLatch mainCountDownLatch;
//子线程
private CountDownLatch childCountDownLatch;
//保存各个子线程执行的结果
private BlockingQueue<Boolean> resultList;
//事务管理
private TransactionManager transactionManager;
//数据结果执行决定是否回滚
private DataRollBack dataRollBack;
public DataInsertRunable(InsertService insertService, CountDownLatch mainCountDownLatch, CountDownLatch childCountDownLatch, BlockingQueue<Boolean> resultList,
TransactionManager transactionManager,DataRollBack dataRollBack) {
this.insertService = insertService;
this.mainCountDownLatch = mainCountDownLatch;
this.childCountDownLatch = childCountDownLatch;
this.resultList = resultList;
this.transactionManager = transactionManager;
this.dataRollBack = dataRollBack;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"子线程开始执行任务");
Boolean result = insertService.insert("test");
resultList.add(result);
//子线程执行
childCountDownLatch.countDown();
//阻塞主线程 回滚事务是由主线程来执行的
try {
mainCountDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"子线程执行剩下的任务");
if (dataRollBack.isRollBack()){
transactionManager.rollBack();
}
else {
transactionManager.commit();;
}
}
}
写个主线程测试
package insert;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @author aaa@qq.com
* @version 1.0
* @date
* @description
*/
public class DataMain {
public static void main(String[] args) {
int CHILD_SUM =5;
CountDownLatch mainCountDownLathc = new CountDownLatch(1);
CountDownLatch childCountDownLathc = new CountDownLatch(CHILD_SUM);
DataRollBack dataRollBack = new DataRollBack(false);
InsertService insertService = new InsertService();
TransactionManager transactionManager = new TransactionManager();
BlockingQueue<Boolean> resultList = new LinkedBlockingDeque<>(10);
//创建5个线程
for (int i=0;i<CHILD_SUM;i++){
DataInsertRunable dataInsertRunable = new DataInsertRunable(insertService,mainCountDownLathc,childCountDownLathc,resultList,transactionManager,dataRollBack);
Thread thread = new Thread(dataInsertRunable);
thread.start();
}
//阻塞子线程 子线程不执行完 下面的逻辑不会被执行
try {
childCountDownLathc.await();
System.out.println("主线程开始执行任务");
//通过队列获取结果
for (int i=0;i<CHILD_SUM;i++){
Boolean result = resultList.take();
if (!result){
dataRollBack.setRollBack(true);
}
}
// 执行主线程 此时子线程应该是已经执行完了
mainCountDownLathc.countDown();
if (dataRollBack.isRollBack()){
System.out.println("执行线程出现了异常,需要进行全局事务回滚");
}
else {
System.out.println("多线程插入数据已经全部完毕且提交事务了");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
看一下最终测试的效果