欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

多线程多批量插入大数据

程序员文章站 2022-06-17 09:06:31
...

 

参考  https://blog.csdn.net/xunwei0303/article/details/80241340?utm_source=blogxgwz1

 

创建多个线程,每个线程处理一批数据。

 

1. 创建表(mysql)

Sql代码  多线程多批量插入大数据
            
    
    博客分类: javaspringboot javaspringboot 
  1. CREATE TABLE TEST_BATCH_INSERT  
  2. (  
  3.   TEST_ID bigint PRIMARY key,  
  4.   TEST_NAME VARCHAR(100),  
  5.   AGE INT(5),  
  6.   CREATE_TIME DATETIME DEFAULT current_timestamp,  
  7.   UPDATE_TIME DATETIME DEFAULT current_timestamp  
  8. ) comment '测试批量插入';  

 

2. java bean

Java代码  多线程多批量插入大数据
            
    
    博客分类: javaspringboot javaspringboot 
  1. public class TestBatchInsertInfo {  
  2.     private Long testId;  
  3.   
  4.     private String testName;  
  5.   
  6.     private Integer age;  
  7.   
  8.     private Date createTime;  
  9.   
  10.     private Date updateTime;  
  11.   
  12.     // 省略getter/setter  
  13. }  

 

3. dao

Java代码  多线程多批量插入大数据
            
    
    博客分类: javaspringboot javaspringboot 
  1. public interface ITestBatchInsertMapper {  
  2.   
  3.     void batchInsert(List<TestBatchInsertInfo> list);  
  4. }  

 

4. mapper.xml

Xml代码  多线程多批量插入大数据
            
    
    博客分类: javaspringboot javaspringboot 
  1. <insert id="batchInsert" parameterType="java.util.List">  
  2.     INSERT INTO TEST_BATCH_INSERT   
  3.     (  
  4.      TEST_ID, TEST_NAME, AGE, CREATE_TIME, UPDATE_TIME  
  5.     )  
  6.     VALUES  
  7.     <foreach collection="list" item="log" index"index" separator =",">  
  8.         (  
  9.         #{log.testId, jdbcType=NUMERIC}, #{log.testName, jdbcType=VARCHAR}, #{log.age, jdbcType=NUMERIC},   
  10.         sysdate(), sysdate()  
  11.         )  
  12.     </foreach>  
  13.   </insert>  

 

5. 多线程

Java代码  多线程多批量插入大数据
            
    
    博客分类: javaspringboot javaspringboot 
  1. public class TestBatchInsertThread implements Runnable {  
  2.   
  3.     private ITestBatchInsertMapper testBatchInsertMapper;  
  4.   
  5.     /** 数据集合 */  
  6.     private List<TestBatchInsertInfo> list;  
  7.     /** 每个线程处理的起始数据 */  
  8.     private CountDownLatch begin;  
  9.     /** 每个线程处理的结束数据 */  
  10.     private CountDownLatch end;  
  11.   
  12.     public TestBatchInsertThread() {  
  13.     }  
  14.   
  15.     public TestBatchInsertThread(List<TestBatchInsertInfo> list, CountDownLatch begin, CountDownLatch end,  
  16.             ITestBatchInsertMapper testBatchInsertMapper) {  
  17.         this.list = list;  
  18.         this.begin = begin;  
  19.         this.end = end;  
  20.         this.testBatchInsertMapper = testBatchInsertMapper;  
  21.     }  
  22.   
  23.     @Override  
  24.     public void run() {  
  25.         try {  
  26.             if (list != null && !list.isEmpty()) {  
  27.                 testBatchInsertMapper.batchInsert(list);  
  28.             }  
  29.             // 执行完让线程直接进入等待  
  30.             begin.await();  
  31.         } catch (Exception e) {  
  32.             e.printStackTrace();  
  33.         } finally {  
  34.             // 当一个线程执行完 了计数要减一不然这个线程会被一直挂起  
  35.             end.countDown();  
  36.         }  
  37.     }  
  38. }  

 

6. service

多线程处理的方法是 batchInsertByThread;

 

普通批量处理的方法是 batchInsert

 

 

@Service(value = "testBatchInsertService")
public class TestBatchInsertServiceImpl implements ITestBatchInsertService {

    @Autowired
    private ITestBatchInsertMapper testBatchInsertMapper;

    @Override
    @Transactional
    public void batchInsertByThread(List<TestBatchInsertInfo> list) throws Exception {

        if (list == null || list.isEmpty()) {
            return;
        }
        // 一个线程处理300条数据
        int count = 1000;
        // 数据集合大小
        int listSize = list.size();
        // 开启的线程数
        int runSize = (listSize / count) + 1;
        // 存放每个线程的执行数据
        List<TestBatchInsertInfo> newList = null;
        // 创建一个线程池,数量和开启线程的数量一样
        ExecutorService executor = Executors.newFixedThreadPool(runSize);
        // 创建两个个计数器
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(runSize);

        for (int i = 0; i < runSize; i++) {
            /* 计算每个线程执行的数据 */
            if ((i + 1) == runSize) {
                int startIdx = (i * count);
                int endIdx = list.size();

                newList = list.subList(startIdx, endIdx);
            } else {
                int startIdx = (i * count);
                int endIdx = (i + 1) * count;

                newList = list.subList(startIdx, endIdx);
            }
            TestBatchInsertThread thread = new TestBatchInsertThread(newList, begin, end, testBatchInsertMapper);

            executor.execute(thread);
        }
        begin.countDown();
        end.await();

        executor.shutdown();
    }

    @Override
    public void batchInsert(List<TestBatchInsertInfo> list) {

        if (list == null || list.isEmpty()) {
            return;
        }

        List<TestBatchInsertInfo> tempList = new LinkedList<>();

        for (int i = 0; i < list.size(); i++) {
            
            tempList.add(list.get(i));
            
            if (i % 1000 == 0) {
                testBatchInsertMapper.batchInsert(tempList);
                tempList.clear();
            }
        }
        testBatchInsertMapper.batchInsert(tempList);
    }
}

 

7. junit4 测试方法

Java代码  多线程多批量插入大数据
            
    
    博客分类: javaspringboot javaspringboot 
  1. import java.util.LinkedList;  
  2. import java.util.List;  
  3.   
  4. import org.junit.Test;  
  5. import org.junit.runner.RunWith;  
  6. import org.springframework.beans.factory.annotation.Autowired;  
  7. import org.springframework.boot.test.context.SpringBootTest;  
  8. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;  
  9.   
  10. import com.jieshun.springboot.mybatis.MybatisApplication;  
  11. import com.jieshun.springboot.mybatis.bean.po.TestBatchInsertInfo;  
  12. import com.jieshun.springboot.mybatis.service.ITestBatchInsertService;  
  13.   
  14.   
  15. @RunWith(SpringJUnit4ClassRunner.class)  
  16. @SpringBootTest(classes = MybatisApplication.class/*, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT*/)  
  17. public class TestBatchInsertService {  
  18.   
  19.     @Autowired  
  20.     private ITestBatchInsertService testBatchInsertService;  
  21.   
  22.     @Test  
  23.     public void batchInsertByThread() {  
  24.   
  25.         long startTime = System.currentTimeMillis();  
  26.   
  27.         try {  
  28.             List<TestBatchInsertInfo> list = new LinkedList<>();  
  29.   
  30.             TestBatchInsertInfo info = null;  
  31.   
  32.             for (int i = 0; i < 100301; i++) {  
  33.   
  34.                 Integer ig = i;  
  35.   
  36.                 info = new TestBatchInsertInfo();  
  37.                 info.setTestId(ig.longValue());  
  38.                 info.setTestName("test名称_" + i);  
  39.                 info.setAge(i);  
  40.   
  41.                 list.add(info);  
  42.             }  
  43.   
  44.             testBatchInsertService.batchInsertByThread(list);  
  45.   
  46.             System.out.println("------Batch Insert Success------");  
  47.   
  48.         } catch (Exception e) {  
  49.             e.printStackTrace();  
  50.         }  
  51.         System.out.println("耗时(ms):" + (System.currentTimeMillis() - startTime));  
  52.     }  
  53.   
  54.     @Test  
  55.     public void batchInsert() {  
  56.   
  57.         long startTime = System.currentTimeMillis();  
  58.   
  59.         try {  
  60.             List<TestBatchInsertInfo> list = new LinkedList<>();  
  61.   
  62.             TestBatchInsertInfo info = null;  
  63.   
  64.             for (int i = 0; i < 100301; i++) {  
  65.   
  66.                 Integer ig = i;  
  67.   
  68.                 info = new TestBatchInsertInfo();  
  69.                 info.setTestId(ig.longValue());  
  70.                 info.setTestName("test名称_" + i);  
  71.                 info.setAge(i);  
  72.   
  73.                 list.add(info);  
  74.             }  
  75.   
  76.             testBatchInsertService.batchInsert(list);  
  77.   
  78.             System.out.println("------Batch Insert Success------");  
  79.   
  80.         } catch (Exception e) {  
  81.             e.printStackTrace();  
  82.         }  
  83.         System.out.println("耗时(ms):" + (System.currentTimeMillis() - startTime));  
  84.   
  85.     }  
  86. }  

 

8. springboot 启动类

Java代码  多线程多批量插入大数据
            
    
    博客分类: javaspringboot javaspringboot 
  1. import org.mybatis.spring.annotation.MapperScan;  
  2. import org.springframework.boot.SpringApplication;  
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;  
  4. import org.springframework.context.annotation.ComponentScan;  
  5. import org.springframework.transaction.annotation.EnableTransactionManagement;  
  6.   
  7. /** 
  8.  * 应用启动类 
  9.  *  
  10.  * @author  
  11.  * @date 2018年10月17日 
  12.  * @since JDK 1.8 
  13.  */  
  14. @SpringBootApplication  
  15. @EnableTransactionManagement  
  16. @ComponentScan(basePackages = { "com.jieshun.springboot.mybatis" })  
  17. @MapperScan(basePackages = { "com.jieshun.springboot.mybatis.dao" })  
  18. public class MybatisApplication {  
  19.   
  20.     public static void main(String[] args) {  
  21.         SpringApplication.run(MybatisApplication.class, args);  
  22.     }  
  23.   
  24. }  

 

 

本文转自:http://xurichusheng.iteye.com/blog/2433024

相关标签: java springboot