多线程多批量插入大数据
参考 https://blog.csdn.net/xunwei0303/article/details/80241340?utm_source=blogxgwz1
创建多个线程,每个线程处理一批数据。
1. 创建表(mysql)
- CREATE TABLE TEST_BATCH_INSERT
- (
- TEST_ID bigint PRIMARY key,
- TEST_NAME VARCHAR(100),
- AGE INT(5),
- CREATE_TIME DATETIME DEFAULT current_timestamp,
- UPDATE_TIME DATETIME DEFAULT current_timestamp
- ) comment '测试批量插入';
2. java bean
- public class TestBatchInsertInfo {
- private Long testId;
- private String testName;
- private Integer age;
- private Date createTime;
- private Date updateTime;
- // 省略getter/setter
- }
3. dao
- public interface ITestBatchInsertMapper {
- void batchInsert(List<TestBatchInsertInfo> list);
- }
4. mapper.xml
- <insert id="batchInsert" parameterType="java.util.List">
- INSERT INTO TEST_BATCH_INSERT
- (
- TEST_ID, TEST_NAME, AGE, CREATE_TIME, UPDATE_TIME
- )
- VALUES
- <foreach collection="list" item="log" index= "index" separator =",">
- (
- #{log.testId, jdbcType=NUMERIC}, #{log.testName, jdbcType=VARCHAR}, #{log.age, jdbcType=NUMERIC},
- sysdate(), sysdate()
- )
- </foreach>
- </insert>
5. 多线程
- public class TestBatchInsertThread implements Runnable {
- private ITestBatchInsertMapper testBatchInsertMapper;
- /** 数据集合 */
- private List<TestBatchInsertInfo> list;
- /** 每个线程处理的起始数据 */
- private CountDownLatch begin;
- /** 每个线程处理的结束数据 */
- private CountDownLatch end;
- public TestBatchInsertThread() {
- }
- public TestBatchInsertThread(List<TestBatchInsertInfo> list, CountDownLatch begin, CountDownLatch end,
- ITestBatchInsertMapper testBatchInsertMapper) {
- this.list = list;
- this.begin = begin;
- this.end = end;
- this.testBatchInsertMapper = testBatchInsertMapper;
- }
- @Override
- public void run() {
- try {
- if (list != null && !list.isEmpty()) {
- testBatchInsertMapper.batchInsert(list);
- }
- // 执行完让线程直接进入等待
- begin.await();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- // 当一个线程执行完 了计数要减一不然这个线程会被一直挂起
- end.countDown();
- }
- }
- }
6. service
多线程处理的方法是 batchInsertByThread;
普通批量处理的方法是 batchInsert
@Service(value = "testBatchInsertService")
public class TestBatchInsertServiceImpl implements ITestBatchInsertService {
@Autowired
private ITestBatchInsertMapper testBatchInsertMapper;
@Override
@Transactional
public void batchInsertByThread(List<TestBatchInsertInfo> list) throws Exception {
if (list == null || list.isEmpty()) {
return;
}
// 一个线程处理300条数据
int count = 1000;
// 数据集合大小
int listSize = list.size();
// 开启的线程数
int runSize = (listSize / count) + 1;
// 存放每个线程的执行数据
List<TestBatchInsertInfo> newList = null;
// 创建一个线程池,数量和开启线程的数量一样
ExecutorService executor = Executors.newFixedThreadPool(runSize);
// 创建两个个计数器
CountDownLatch begin = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(runSize);
for (int i = 0; i < runSize; i++) {
/* 计算每个线程执行的数据 */
if ((i + 1) == runSize) {
int startIdx = (i * count);
int endIdx = list.size();
newList = list.subList(startIdx, endIdx);
} else {
int startIdx = (i * count);
int endIdx = (i + 1) * count;
newList = list.subList(startIdx, endIdx);
}
TestBatchInsertThread thread = new TestBatchInsertThread(newList, begin, end, testBatchInsertMapper);
executor.execute(thread);
}
begin.countDown();
end.await();
executor.shutdown();
}
@Override
public void batchInsert(List<TestBatchInsertInfo> list) {
if (list == null || list.isEmpty()) {
return;
}
List<TestBatchInsertInfo> tempList = new LinkedList<>();
for (int i = 0; i < list.size(); i++) {
tempList.add(list.get(i));
if (i % 1000 == 0) {
testBatchInsertMapper.batchInsert(tempList);
tempList.clear();
}
}
testBatchInsertMapper.batchInsert(tempList);
}
}
7. junit4 测试方法
- import java.util.LinkedList;
- import java.util.List;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
- import com.jieshun.springboot.mybatis.MybatisApplication;
- import com.jieshun.springboot.mybatis.bean.po.TestBatchInsertInfo;
- import com.jieshun.springboot.mybatis.service.ITestBatchInsertService;
- @RunWith(SpringJUnit4ClassRunner.class)
- @SpringBootTest(classes = MybatisApplication.class/*, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT*/)
- public class TestBatchInsertService {
- @Autowired
- private ITestBatchInsertService testBatchInsertService;
- @Test
- public void batchInsertByThread() {
- long startTime = System.currentTimeMillis();
- try {
- List<TestBatchInsertInfo> list = new LinkedList<>();
- TestBatchInsertInfo info = null;
- for (int i = 0; i < 100301; i++) {
- Integer ig = i;
- info = new TestBatchInsertInfo();
- info.setTestId(ig.longValue());
- info.setTestName("test名称_" + i);
- info.setAge(i);
- list.add(info);
- }
- testBatchInsertService.batchInsertByThread(list);
- System.out.println("------Batch Insert Success------");
- } catch (Exception e) {
- e.printStackTrace();
- }
- System.out.println("耗时(ms):" + (System.currentTimeMillis() - startTime));
- }
- @Test
- public void batchInsert() {
- long startTime = System.currentTimeMillis();
- try {
- List<TestBatchInsertInfo> list = new LinkedList<>();
- TestBatchInsertInfo info = null;
- for (int i = 0; i < 100301; i++) {
- Integer ig = i;
- info = new TestBatchInsertInfo();
- info.setTestId(ig.longValue());
- info.setTestName("test名称_" + i);
- info.setAge(i);
- list.add(info);
- }
- testBatchInsertService.batchInsert(list);
- System.out.println("------Batch Insert Success------");
- } catch (Exception e) {
- e.printStackTrace();
- }
- System.out.println("耗时(ms):" + (System.currentTimeMillis() - startTime));
- }
- }
8. springboot 启动类
- import org.mybatis.spring.annotation.MapperScan;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.annotation.ComponentScan;
- import org.springframework.transaction.annotation.EnableTransactionManagement;
- /**
- * 应用启动类
- *
- * @author
- * @date 2018年10月17日
- * @since JDK 1.8
- */
- @SpringBootApplication
- @EnableTransactionManagement
- @ComponentScan(basePackages = { "com.jieshun.springboot.mybatis" })
- @MapperScan(basePackages = { "com.jieshun.springboot.mybatis.dao" })
- public class MybatisApplication {
- public static void main(String[] args) {
- SpringApplication.run(MybatisApplication.class, args);
- }
- }
本文转自:http://xurichusheng.iteye.com/blog/2433024