Elasticsearch滚动查询+线程池处理大批量数据
程序员文章站
2022-06-15 15:01:56
...
什么是滚动查询?
scroll 查询 可以用来对 Elasticsearch 有效地执行大批量的文档查询,而又不用付出深度分页那种代价。
scroll查询允许我们 先做查询初始化,然后再批量地拉取结果。 这有点儿像传统数据库中的 游标(cursor) 。
scroll查询会在第一次查询时,保存一个当时时间点的快照数据,之后只会基于该旧的视图快照提供数据搜索,如果这个期间数据变更,这个快照数据保持不变。
深度分页的代价根源是结果集全局排序,如果去掉全局排序的特性的话查询结果的成本就会很低。scroll查询采用基于_doc进行排序的方式,性能较高。
每次发送scroll请求,我们还需要指定一个 scrollId,设置失效时间,每次搜索请求只要在这个失效时间内能完成就可以了。
更多滚动查询请参考:https://www.elastic.co/guide/cn/elasticsearch/guide/current/scroll.html
java API中的滚动查询
MatchQueryBuilder qb = QueryBuilders.matchQuery("message","去养猪了");
SearchResponse scrollResp = client.prepareSearch()
.addSort(FieldSortBuilder.DOC_FIELD_NAME,SortOrder.ASC)
.setScroll(new TimeValue(10))
.setQuery(qb)
.setSize(100).get();// 设置hits次数最多为100
// 将查询出来批量数据,按照hits次数遍历,直到hits为空
while(scrollResp.getHits().getHits().length !=0) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
System.out.println(hit.toString());
// Handle the hit...
}
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(100)).execute().actionGet();
};
使用elasticsearchTemplate实现滚动查询
获取scrollId:
String scrollId = elasticsearchTemplate.scan(searchQuery, 6000, false);
滚动查询:
Page<Entity> pages = elasticsearchTemplate.scroll(scrollId, 6000, Entity.class);
滚动查询+线程池
注入elasticsearchTemplate:
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
// 定义线程池,线程数为10个
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
实现代码:
String scrollId = null;
try {
// 滚动查询失效时间
int SCROLL_TIMEOUT_INMILLIS = 12 * 60 * 60 * 1000;
// 构建查询语句
QueryBuilder orgQuery = QueryBuilders
.boolQuery()
.filter(QueryBuilders.termQuery("orgCode", orgCode));
// es检索对象
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withIndices("你的索引index")
.withTypes("你的类型type")
.withQuery(orgQuery)
.withPageable(new PageRequest(0, 100)) // 一次查询100条
.build();
// 获取初始滚动ID
// 记录当前查询条件下所有数据的快照,之后的滚动查询都是从快照一批批的查
scrollId = elasticsearchTemplate.scan(searchQuery, SCROLL_TIMEOUT_INMILLIS, false);
while(true) {
// 滚动查询
Page<DemoEntity> pages = elasticsearchTemplate.scroll(scrollId, SCROLL_TIMEOUT_INMILLIS, DemoEntity.class);
// 所有数据滚动完成,退出while
if(!pages.hasContent()) {
break;
}
for(DemoEntity document:pages.getContent()) {
// 线程池
executorService.execute(new clearMethod(document));
}
}
} catch (Exception e) {
// exception
}finally {
// 清除滚动id
elasticsearchTemplate.clearScroll(scrollId);
// 关闭线程池
executorService.shutdown();
}
class clearMethod extends Thread {
private DemoEntity document;
public clearMethod(DemoEntity document) {
this.document = document;
}
public void run() {
// do something...
}
}
结束
如对elasticsearch不是很了解,可以参考我前几篇博客。
因为工作要求,本篇描述的代码ES版本为2.4.0,可能有些过时了,但是思路大体上上正确的。
本篇所示的代码是伪代码,请不要直接copy,如有描述不清楚的地方,欢迎留言。
上一篇: C++ string与int的相互转换(使用C++11)
下一篇: npm镜像源管理