双数据源结合线程池使用案例
程序员文章站
2022-06-09 12:57:26
...
背景:批量处理数据,6000万从一个数据库倒入到另外一个数据库,并且对数据进行处理后再存入,期间尝试了3种方式,都因为效率跟不上而放弃,这种方式其实我是在终端同时运行了4个jar包(当然4个jar包又一点点改变,不让肯定会有问题),代码可能有点乱,只捡主要的。
添加配置文件配置-只是必要的:
page.helper.helper-dialect=mysql
page.helper.reasonable=true
page.helper.support-methods-arguments=true
# 数据库1
spring.datasource.primary.jdbc-url=jdbc:mysql://ip:端口/SRC_BENZ_CENTER?useUnicode=true&characterEncoding=utf8&failOverReadOnly=false&socketTimeout=600000&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true
spring.datasource.primary.username=root
spring.datasource.primary.password=root
spring.datasource.primary.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.primary.minimum-idle=1
spring.datasource.primary.maximum-pool-size=2
spring.datasource.primary.idle-timeout=6000
spring.datasource.primary.connection-timeout=5000
spring.datasource.primary.auto-commit=true
spring.datasource.primary.connection-test-query=SELECT 1
# 数据库2
spring.datasource.secondary.jdbc-url=jdbc:mysql://ip:3306/ODS_BENZ_CENTER?useUnicode=true&characterEncoding=utf8&failOverReadOnly=false&socketTimeout=60000&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&useAffectedRows=true
spring.datasource.secondary.username=root
spring.datasource.secondary.password=root
spring.datasource.secondary.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.secondary.minimum-idle=1
spring.datasource.secondary.maximum-pool-size=2
spring.datasource.secondary.idle-timeout=6000
spring.datasource.secondary.connection-timeout=5000
spring.datasource.secondary.auto-commit=true
spring.datasource.secondary.connection-test-query=SELECT 1
配置数据源对应的配置类:
对应数据库1
@Slf4j
@Configuration
@MapperScan(basePackages = PrimaryDatasourceConfig.PACKAGE,sqlSessionFactoryRef = "primarySqlSessionFactory")
public class PrimaryDatasourceConfig {
//这里扫描包路径指定为对应的数据库1的dao层,
static final String PACKAGE = "com.from.clean.data.dao.primary";
@Bean(name = "primaryDataSource")
@ConfigurationProperties(prefix = "spring.datasource.primary")
public HikariDataSource dataSource() {
return new HikariDataSource();
}
@Bean(name = "primaryTransactionManager")
public DataSourceTransactionManager transactionManager() {
return new DataSourceTransactionManager(this.dataSource());
}
@Bean(name = "primarySqlSessionFactory")
public SqlSessionFactory sqlSessionFactory(@Qualifier("primaryDataSource") DataSource dataSource) throws Exception {
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(dataSource);
sessionFactory.getObject().getConfiguration().setMapUnderscoreToCamelCase(true);
return sessionFactory.getObject();
}
对应数据库2配置:
@Slf4j
@Configuration
@MapperScan(basePackages = SecondDatasourceConfig.PACKAGE,sqlSessionFactoryRef = "secondSqlSessionFactory")
public class SecondDatasourceConfig {
//这里扫描包路径指定为对应的数据库2的dao层,
static final String PACKAGE = "com.from.clean.data.dao.secondary";
@Bean(name = "secondDataSource")
@Primary
@ConfigurationProperties(prefix = "spring.datasource.secondary")
public HikariDataSource dataSource() {
return new HikariDataSource();
}
@Bean(name = "secondTransactionManager")
@Primary
public DataSourceTransactionManager transactionManager() {
return new DataSourceTransactionManager(this.dataSource());
}
@Bean(name = "secondSqlSessionFactory")
@Primary
public SqlSessionFactory sqlSessionFactory(@Qualifier("secondDataSource") DataSource dataSource) throws Exception {
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(dataSource);
sessionFactory.getObject().getConfiguration().setMapUnderscoreToCamelCase(true);
return sessionFactory.getObject();
}
以下是线程池方式执行的处理:
但是使用了两种方式,一种是注解式的异步线程池,一种就是现在这种new出来直接创建的方式,最后选择这种,对,是因为方便-捂脸,其实是因为我这里是需要批量处理数据的,因为表内数据在2500万,在我批量处理时候使用这种不断创建线程池方式会使效率提高
@Slf4j
@Component
public class ModelDataServiceTImpl {
// private static final String SOURCE_TABLE = "**";
// private static final String TARGET_TABLE = "**";
//读取配置文件中数据库表名,源表
@Value("${source.table}")
private String SOURCE_TABLE;
//读取配置文件中数据库表名,目标表
@Value("${targer.table}")
private String TARGET_TABLE;
//一次执行条数
private static int PAGES = 100;
private static boolean flag = true;
//线程数
private final int THREAD_COUNT = 10;
private final CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
@Resource // 数据库1的dao
private PrimaryModelDataSource primary;
@Resource //数据库2的dao
private SecondaryModelDataSource secondary;
@Resource
private SyncModelTData syncModelData;
@Scheduled(cron = "${restrul.sync.benzVideo}")
private void syncModelData() {
try {
log.info("开始执行车型数据补全,原表:【{}】---目标表:【{}】", SOURCE_TABLE, TARGET_TABLE);
//查询目标表中状态不为1的数据,一次1000条
// int i = primary.querySourceSum(SOURCE_TABLE);
ExecutorService es = Executors.newFixedThreadPool(THREAD_COUNT);
//查询目标表状态不为0的对象集合
List<ModelTargetPo> modelTargetPos = new ArrayList<>();
while (flag) {
Thread.sleep(6200);
modelTargetPos = secondary.queryTargetData(TARGET_TABLE,600, PAGES);
if (modelTargetPos.size() > 0){
List<ModelSourcePo> modelSourcePos = primary.querySourcePojo(SOURCE_TABLE, modelTargetPos);
syncModelData.syncModelData(TARGET_TABLE,modelSourcePos, modelTargetPos,es);
} else if (modelTargetPos.size() <= 0) {
flag = false;
log.info("**************目标表数据以补全完成,表名:【{}】--对应原表【{}】", TARGET_TABLE,SOURCE_TABLE);
return;
}
}
latch.await();
es.shutdown();
log.info("该表【{}】数据已补全,执行状态为-【{}】",SOURCE_TABLE,flag);
//根据目标表查询原表对象集合
// List<ModelSourcePo> modelSourcePos = primary.querySourcePojo(SOURCE_TABLE, modelTargetPos);
// Future<Integer> integerFuture = syncModelData.syncModelData(TARGET_TABLE, modelSourcePos, modelTargetPos);
//得到数据补全完整的目标对象
// List<ModelTargetPo> modelTargetPosT = this.dataFormation(modelSourcePos, modelTargetPos);
// if (!modelTargetPos.isEmpty()) {
// int resflag = secondary.updataTargetData(TARGET_TABLE, modelTargetPos);
// log.info("标识:【{}】,当前页补全数据成功,优秀!!", resflag);
// }
// log.info("原表:【{}】--->>>目标表:【{}】,数据补全!!!very good!!", SOURCE_TABLE, TARGET_TABLE);
} catch (Exception e) {
log.error("数据修改失败,原因:{}", ExceptionUtils.getStackTrace(e));
}
}
真正使用线程池的方法:
@Service
@Slf4j
public class SyncModelTData {
@Resource // 数据库2的dao
private SecondaryModelDataSource secondary;
@Resource // 数据库1的dao
private PrimaryModelDataSource primary;
// @Async("asyncServiceExecutor")
public void syncModelData(final String targetTable,final List<ModelSourcePo> modelSourcePos, final List<ModelTargetPo> modelTargetPos, ExecutorService es) {
int flag = 0;
try {
List<ModelTargetPo> resModelTargetPos = new ArrayList<>();
for (final ModelSourcePo modelSourcePo : modelSourcePos){
es.submit(new Runnable() {
@Override
public void run() {
//run方法里是自己的处理逻辑
String sourcePoWhc = modelSourcePo.getWhc();
String sourcePoChassbm = modelSourcePo.getChassbm();
String sourcePoChassIdent = modelSourcePo.getChassIdent();
for (final ModelTargetPo modelTargetPo : modelTargetPos){
log.info("测试数据,{}", modelTargetPo.toString());
int resflag = secondary.updataTargetDataT(targetTable, modelTargetPo);
log.info("标识:【{}】,更新!!", resflag);
break;
} else {
continue;
}
}
});
flag++;
log.info("更新了" + flag + "条数据");
}
} catch (Exception e) {
log.error("当前页补全数据失败,原因:{}", ExceptionUtils.getStackTrace(e));
}
// log.info("成功更新了" + modelTargetPos.size() + "条");
// log.info("原表:【{}】--->>>目标表:【{}】,数据补全!!!very good!!", sourceTable, targetTable);
// return new AsyncResult<>(flag);
}
上一篇: 反射案列
下一篇: FLASH绘制可爱的卡通风车