es去重 获取重复数据后保留最小值ID后进行批量删除
程序员文章站
2022-06-02 13:02:15
...
使用脚本进行去重
根据mail获取重复数据后保留res索引的最小值ID后进行批量删除
代码如下:
controller:
/**
* 更新并删除重复数据
*/
@Async
@GetMapping("/up")
@ApiOperation(value = "更新并删除重复数据", notes = "更新并删除重复数据")
public void up(Integer size) {
size = size == null || size <= 0 ? 1000 : size;
boolean state = true;
long num = 0;
do {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
NativeSearchQuery query = new NativeSearchQuery(builder);
query.addAggregation(AggregationBuilders.terms("mail").field("terms").field("mail.keyword").size(size).minDocCount(2));
SearchHits<EsDto> searchHits = elasticsearchRestTemplate.search(query, EsDto.class);
Terms aggs = searchHits.getAggregations().get("mail");
if (aggs.getBuckets().size() == 0) {
state = false;
}
for (Terms.Bucket entry : aggs.getBuckets()) {
if (entry.getDocCount() == 1) {
state = false;
break;
}
String mail = (String) entry.getKey();
log.info(" syncUp mail : {} - mailCount:{}", mail, entry.getDocCount());
NativeSearchQuery queryMail = this.getQuery(0, size, mail);
SearchHits<EsDto> mails = elasticsearchRestTemplate.search(queryMail, EsDto.class);
if (mails.getTotalHits() <= 1) {
state = false;
break;
}
List<EsDto> collect = mails.get().map(e -> e.getContent()).collect(Collectors.toList());
Collection<Integer> ids = collect.stream().skip(1).map(EsDto::getId).collect(Collectors.toSet());
if (ids.size() == 0) {
continue;
}
num += ids.size();
int synchroId = collect.get(0).getId();
esManager.synchronizationUpByBach(ids, synchroId);
}
} while (state);
log.info(" syncUp mail end num: {} ", num);
}
private NativeSearchQuery getQuery(int page, int pageSize, String mail) {
BoolQueryBuilder builder = QueryBuilders.boolQuery();
EsQueryUtil.setMatchPhraseQueryValue("mail.keyword", mail, builder);
NativeSearchQuery query = new NativeSearchQuery(builder);
List<Sort.Order> list = new ArrayList<>();
list.add(new Sort.Order(Sort.Direction.ASC, "id"));
Pageable pageable = PageRequest.of(page, pageSize, Sort.by(list));
query.setPageable(pageable);
query.setTrackTotalHits(true);
return query;
}
manager:
@Async
public void synchronizationUpByBach(Collection<Integer> ids, int newId) {
if (ids.size() == 0 || newId == 0) {
return;
}
List<Es> entities = new ArrayList<>();
for (Integer id : ids) {
Es es = new Es();
es.setId(id);
entities.add(es);
}
log.info(" synchronizationUp success old:{} - new:{} ", StringUtils.join(ids,","), newId);
deleteAll(entities);
}
public void deleteAll(List<Es> entities) {
try {
iEsDao.deleteAll(entities);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
domain:
# indexName 为es索引
@Document(indexName = "res")
public class Es {
@Id
private int id;
private int status;
private String name;
private String mail;
}
执行效果:
其实本身更多的删除数量的,只是陆陆续续做测试,进行调整修改,最后执行加了num总数,执行最后处理了71072条的重复数据,下面的是总共删除的重复数量,删除文档:289351,为了避免处理过慢,可以将controller的执行步骤移到manage里面进行异步处理。
上一篇: 数据库中的序列sequence对象