欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

双数据源结合线程池使用案例

程序员文章站 2022-06-09 12:57:26
...

背景:批量处理数据,6000万从一个数据库倒入到另外一个数据库,并且对数据进行处理后再存入,期间尝试了3种方式,都因为效率跟不上而放弃,这种方式其实我是在终端同时运行了4个jar包(当然4个jar包又一点点改变,不让肯定会有问题),代码可能有点乱,只捡主要的。

添加配置文件配置-只是必要的:

page.helper.helper-dialect=mysql
page.helper.reasonable=true
page.helper.support-methods-arguments=true

# 数据库1
spring.datasource.primary.jdbc-url=jdbc:mysql://ip:端口/SRC_BENZ_CENTER?useUnicode=true&characterEncoding=utf8&failOverReadOnly=false&socketTimeout=600000&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true
spring.datasource.primary.username=root
spring.datasource.primary.password=root
spring.datasource.primary.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.primary.minimum-idle=1
spring.datasource.primary.maximum-pool-size=2
spring.datasource.primary.idle-timeout=6000
spring.datasource.primary.connection-timeout=5000
spring.datasource.primary.auto-commit=true
spring.datasource.primary.connection-test-query=SELECT 1

# 数据库2
spring.datasource.secondary.jdbc-url=jdbc:mysql://ip:3306/ODS_BENZ_CENTER?useUnicode=true&characterEncoding=utf8&failOverReadOnly=false&socketTimeout=60000&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&useAffectedRows=true
spring.datasource.secondary.username=root
spring.datasource.secondary.password=root
spring.datasource.secondary.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.secondary.minimum-idle=1
spring.datasource.secondary.maximum-pool-size=2
spring.datasource.secondary.idle-timeout=6000
spring.datasource.secondary.connection-timeout=5000
spring.datasource.secondary.auto-commit=true
spring.datasource.secondary.connection-test-query=SELECT 1

配置数据源对应的配置类:
对应数据库1

@Slf4j
@Configuration
@MapperScan(basePackages = PrimaryDatasourceConfig.PACKAGE,sqlSessionFactoryRef = "primarySqlSessionFactory")
public class PrimaryDatasourceConfig {
	//这里扫描包路径指定为对应的数据库1的dao层,
    static final String PACKAGE = "com.from.clean.data.dao.primary";

    @Bean(name = "primaryDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.primary")
    public HikariDataSource dataSource() {
        return new HikariDataSource();
    }

    @Bean(name = "primaryTransactionManager")
    public DataSourceTransactionManager transactionManager() {
        return new DataSourceTransactionManager(this.dataSource());
    }

    @Bean(name = "primarySqlSessionFactory")
    public SqlSessionFactory sqlSessionFactory(@Qualifier("primaryDataSource") DataSource dataSource) throws Exception {
        final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
        sessionFactory.setDataSource(dataSource);
        sessionFactory.getObject().getConfiguration().setMapUnderscoreToCamelCase(true);
        return sessionFactory.getObject();
    }

对应数据库2配置:

@Slf4j
@Configuration
@MapperScan(basePackages = SecondDatasourceConfig.PACKAGE,sqlSessionFactoryRef = "secondSqlSessionFactory")
public class SecondDatasourceConfig {
	//这里扫描包路径指定为对应的数据库2的dao层,
    static final String PACKAGE = "com.from.clean.data.dao.secondary";

    @Bean(name = "secondDataSource")
    @Primary
    @ConfigurationProperties(prefix = "spring.datasource.secondary")
    public HikariDataSource dataSource() {
        return new HikariDataSource();
    }

    @Bean(name = "secondTransactionManager")
    @Primary
    public DataSourceTransactionManager transactionManager() {
        return new DataSourceTransactionManager(this.dataSource());
    }

    @Bean(name = "secondSqlSessionFactory")
    @Primary
    public SqlSessionFactory sqlSessionFactory(@Qualifier("secondDataSource") DataSource dataSource) throws Exception {
        final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
        sessionFactory.setDataSource(dataSource);
        sessionFactory.getObject().getConfiguration().setMapUnderscoreToCamelCase(true);
        return sessionFactory.getObject();
    }

以下是线程池方式执行的处理:

但是使用了两种方式,一种是注解式的异步线程池,一种就是现在这种new出来直接创建的方式,最后选择这种,对,是因为方便-捂脸,其实是因为我这里是需要批量处理数据的,因为表内数据在2500万,在我批量处理时候使用这种不断创建线程池方式会使效率提高

@Slf4j
@Component
public class ModelDataServiceTImpl {


//    private static final String SOURCE_TABLE = "**";
//    private static final String TARGET_TABLE = "**";
 //读取配置文件中数据库表名,源表
    @Value("${source.table}") 
    private String SOURCE_TABLE;
//读取配置文件中数据库表名,目标表
    @Value("${targer.table}")
    private String TARGET_TABLE;
    //一次执行条数
    private static int PAGES = 100;
    private static  boolean flag = true;
    //线程数
    private final int THREAD_COUNT = 10;
    private final CountDownLatch latch = new CountDownLatch(THREAD_COUNT);

    @Resource   // 数据库1的dao
    private PrimaryModelDataSource primary;
    @Resource  //数据库2的dao
    private SecondaryModelDataSource secondary;
    @Resource
    private SyncModelTData syncModelData;

    @Scheduled(cron = "${restrul.sync.benzVideo}")
    private void syncModelData() {
        try {
            log.info("开始执行车型数据补全,原表:【{}】---目标表:【{}】", SOURCE_TABLE, TARGET_TABLE);
            //查询目标表中状态不为1的数据,一次1000条
//            int i = primary.querySourceSum(SOURCE_TABLE);
            ExecutorService es = Executors.newFixedThreadPool(THREAD_COUNT);
            //查询目标表状态不为0的对象集合
            List<ModelTargetPo> modelTargetPos = new ArrayList<>();
            while (flag) {
                Thread.sleep(6200);
                modelTargetPos = secondary.queryTargetData(TARGET_TABLE,600, PAGES);
                if (modelTargetPos.size() > 0){
                    List<ModelSourcePo> modelSourcePos = primary.querySourcePojo(SOURCE_TABLE, modelTargetPos);

                    syncModelData.syncModelData(TARGET_TABLE,modelSourcePos, modelTargetPos,es);
                } else if (modelTargetPos.size() <= 0) {
                    flag = false;
                    log.info("**************目标表数据以补全完成,表名:【{}】--对应原表【{}】", TARGET_TABLE,SOURCE_TABLE);
                    return;
                }
            }
            latch.await();
            es.shutdown();
            log.info("该表【{}】数据已补全,执行状态为-【{}】",SOURCE_TABLE,flag);
            //根据目标表查询原表对象集合
//            List<ModelSourcePo> modelSourcePos = primary.querySourcePojo(SOURCE_TABLE, modelTargetPos);
//            Future<Integer> integerFuture = syncModelData.syncModelData(TARGET_TABLE, modelSourcePos, modelTargetPos);
            //得到数据补全完整的目标对象
//            List<ModelTargetPo> modelTargetPosT = this.dataFormation(modelSourcePos, modelTargetPos);
//            if (!modelTargetPos.isEmpty()) {
//                int resflag = secondary.updataTargetData(TARGET_TABLE, modelTargetPos);
//                log.info("标识:【{}】,当前页补全数据成功,优秀!!", resflag);
//            }

//            log.info("原表:【{}】--->>>目标表:【{}】,数据补全!!!very good!!", SOURCE_TABLE, TARGET_TABLE);
        } catch (Exception e) {
            log.error("数据修改失败,原因:{}", ExceptionUtils.getStackTrace(e));
        }
    }

真正使用线程池的方法:

@Service
@Slf4j
public class SyncModelTData {
    @Resource  // 数据库2的dao
    private SecondaryModelDataSource secondary;
    @Resource  // 数据库1的dao
    private PrimaryModelDataSource primary;


//    @Async("asyncServiceExecutor")  
    public void syncModelData(final String targetTable,final List<ModelSourcePo> modelSourcePos, final List<ModelTargetPo> modelTargetPos, ExecutorService es) {
        int flag = 0;
        try {
            List<ModelTargetPo> resModelTargetPos = new ArrayList<>();
            for (final ModelSourcePo modelSourcePo : modelSourcePos){
                es.submit(new Runnable() {
                    @Override
                    public void run() {
		//run方法里是自己的处理逻辑
                        String sourcePoWhc = modelSourcePo.getWhc();
                        String sourcePoChassbm = modelSourcePo.getChassbm();
                        String sourcePoChassIdent = modelSourcePo.getChassIdent();

                        for (final ModelTargetPo modelTargetPo : modelTargetPos){
                           
                                log.info("测试数据,{}", modelTargetPo.toString());
                                int resflag = secondary.updataTargetDataT(targetTable, modelTargetPo);
                                log.info("标识:【{}】,更新!!", resflag);
                                break;
                            } else {
                                continue;
                            }
                        }
                });
            flag++;
            log.info("更新了" + flag + "条数据");

            }
        } catch (Exception e) {
            log.error("当前页补全数据失败,原因:{}", ExceptionUtils.getStackTrace(e));
        }
//        log.info("成功更新了" + modelTargetPos.size() + "条");

//        log.info("原表:【{}】--->>>目标表:【{}】,数据补全!!!very good!!", sourceTable, targetTable);
//        return new AsyncResult<>(flag);
    }
相关标签: 案例