欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Elasticsearch滚动查询+线程池处理大批量数据

程序员文章站 2022-06-15 15:01:56
...

什么是滚动查询?

scroll 查询 可以用来对 Elasticsearch 有效地执行大批量的文档查询,而又不用付出深度分页那种代价。
scroll查询允许我们 先做查询初始化,然后再批量地拉取结果。 这有点儿像传统数据库中的 游标(cursor) 。

scroll查询会在第一次查询时,保存一个当时时间点的快照数据,之后只会基于该旧的视图快照提供数据搜索,如果这个期间数据变更,这个快照数据保持不变。

深度分页的代价根源是结果集全局排序,如果去掉全局排序的特性的话查询结果的成本就会很低。scroll查询采用基于_doc进行排序的方式,性能较高。

每次发送scroll请求,我们还需要指定一个 scrollId,设置失效时间,每次搜索请求只要在这个失效时间内能完成就可以了。
更多滚动查询请参考:https://www.elastic.co/guide/cn/elasticsearch/guide/current/scroll.html

java API中的滚动查询

MatchQueryBuilder qb = QueryBuilders.matchQuery("message","去养猪了");

SearchResponse scrollResp = client.prepareSearch()
		.addSort(FieldSortBuilder.DOC_FIELD_NAME,SortOrder.ASC)
		.setScroll(new TimeValue(10))
		.setQuery(qb)
		.setSize(100).get();// 设置hits次数最多为100


// 将查询出来批量数据,按照hits次数遍历,直到hits为空
while(scrollResp.getHits().getHits().length !=0) {
	for (SearchHit hit : scrollResp.getHits().getHits()) {
		System.out.println(hit.toString());
		// Handle the hit...
	}
	
	scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(100)).execute().actionGet();
}; 

使用elasticsearchTemplate实现滚动查询

获取scrollId:

String scrollId = elasticsearchTemplate.scan(searchQuery, 6000, false);

滚动查询:

Page<Entity> pages = elasticsearchTemplate.scroll(scrollId, 6000, Entity.class);

滚动查询+线程池

注入elasticsearchTemplate:

@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
// 定义线程池,线程数为10个
private static ExecutorService executorService = Executors.newFixedThreadPool(10);

实现代码:

String scrollId = null;
try {
	// 滚动查询失效时间
	int SCROLL_TIMEOUT_INMILLIS = 12 * 60 * 60 * 1000;
	
	// 构建查询语句
	QueryBuilder orgQuery = QueryBuilders
			.boolQuery()
			.filter(QueryBuilders.termQuery("orgCode", orgCode));
	// es检索对象
	SearchQuery searchQuery = new NativeSearchQueryBuilder()
			.withIndices("你的索引index")
			.withTypes("你的类型type")
			.withQuery(orgQuery)
			.withPageable(new PageRequest(0, 100)) // 一次查询100条
			.build();
				
	// 获取初始滚动ID
	// 记录当前查询条件下所有数据的快照,之后的滚动查询都是从快照一批批的查
	scrollId = elasticsearchTemplate.scan(searchQuery, SCROLL_TIMEOUT_INMILLIS, false);
	while(true) {
		
		// 滚动查询
		Page<DemoEntity> pages = elasticsearchTemplate.scroll(scrollId, SCROLL_TIMEOUT_INMILLIS, DemoEntity.class);
		
		// 所有数据滚动完成,退出while
		if(!pages.hasContent()) {
			break;
		}
		
		for(DemoEntity document:pages.getContent()) {
			// 线程池
			executorService.execute(new clearMethod(document));
		}
	}
} catch (Exception e) {
	// exception
}finally {
	// 清除滚动id
	elasticsearchTemplate.clearScroll(scrollId);
	// 关闭线程池
	executorService.shutdown();
}

class clearMethod extends Thread {
	private DemoEntity document;
	
	public clearMethod(DemoEntity document) {
		this.document = document;
	}
	public void run() {
		// do something...
    }
}

结束

如对elasticsearch不是很了解,可以参考我前几篇博客。
因为工作要求,本篇描述的代码ES版本为2.4.0,可能有些过时了,但是思路大体上上正确的。
本篇所示的代码是伪代码,请不要直接copy,如有描述不清楚的地方,欢迎留言。