欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

springboot多线程读取百万级数据写入文件的代码实例

程序员文章站 2022-10-03 13:54:49
1、springboot 配置线程池package com.toycloud.awaken.platform.config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.annotation.EnableAsync;import org.spr...

1、springboot 配置线程池

package com.toycloud.awaken.platform.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class ExecutorConfig {
    @Bean
    public Executor asyncServiceExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(5);
        //配置最大线程数
        executor.setMaxPoolSize(10);
        //配置队列大小
        executor.setQueueCapacity(400);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("thread-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
} 

2、编写一个测试类执行多线程

package com.toycloud.awaken.platform;

import com.toycloud.awaken.platform.service.AsyncService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.CountDownLatch;

@RunWith(SpringRunner.class)   // 项目启动时启动
@SpringBootTest
public class PlatformApplicationTests {

    private  CountDownLatch countDownLatch;

    @Autowired
    AsyncService asyncService;

    @Test
    public void mainWait() {
        // 二十万条数据 每页查询10000条
        int total = 209124,pageSize = 10000;
        int num = total%pageSize == 0 ? total/pageSize : total/pageSize+1;
        try {
            countDownLatch = new CountDownLatch(num);
            //计数器数量就等于文件数量,因为每个文件会开一个线程
            for(int i=0;i<num;i++){
                asyncService.dataBaseToQueue(countDownLatch,i);
            }

            countDownLatch.await();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
} 

3、编写一个service接口

package com.toycloud.awaken.platform.service;

import java.util.List;
import java.util.concurrent.CountDownLatch;

public interface AsyncService {


    /**
     *  执行异步任务
     */
    void dataBaseToQueue(CountDownLatch countDownLatch, int i);
}

4、service的实现类

package com.toycloud.awaken.platform.service.serviceimpl;

import com.toycloud.awaken.platform.service.AsyncService;
import com.toycloud.awaken.platform.util.OCRUtil;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.sql.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

@Service
public class AsyncServiceImpl implements AsyncService {




    @Override
    @Async("asyncServiceExecutor")
    public void dataBaseToQueue(CountDownLatch countDownLatch, int begiNnum) {
        Connection con = null;		//连接
        PreparedStatement pstmt = null;
        ResultSet rs = null;	//获取的结果集
        try {
            long start = System.currentTimeMillis();
            con = DriverManager.getConnection("jdbc:mysql://。。。。:3306/database", "root", "root"); //获取连接
            System.out.println("线程" + Thread.currentThread().getId() + "开始执行");

                pstmt = con.prepareStatement("select name,age from table limit ?,?");  // 查询列表,需要做分页优化,尽量不要使用limit
                
                pstmt.setInt(1,begiNnum*10000);
                pstmt.setInt(2,10000);
                rs = pstmt.executeQuery();
                // 结果集 中列的名称和类型的信息
                ResultSetMetaData rsm = rs.getMetaData();
                int colNumber = rsm.getColumnCount();
                // 遍历每条记录
                while (rs.next()) {
                    string name = rs.getString("name");
                    // 将name 写入到一个文件中
                    // 具体自己实现
                    
                }

            System.out.println("线程" + Thread.currentThread().getId() + "执行结束");
            long end = System.currentTimeMillis();
            System.out.println(end - start);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            //关闭资源,倒关
            try {
                if(rs != null) rs.close();
                if(pstmt != null) pstmt.close();
                if(con != null) con.close();  //必须要关
            } catch (Exception e) {
                System.err.println(e.getMessage());
            }

            countDownLatch.countDown();
        }
    }
} 


本文地址:https://blog.csdn.net/qq_41315539/article/details/107885680