欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

es去重 获取重复数据后保留最小值ID后进行批量删除

程序员文章站 2022-06-02 13:02:15
...

使用脚本进行去重

根据mail获取重复数据后保留res索引的最小值ID后进行批量删除

代码如下:

controller:

     /**
     * 更新并删除重复数据
     */
    @Async
    @GetMapping("/up")
    @ApiOperation(value = "更新并删除重复数据", notes = "更新并删除重复数据")
    public void up(Integer size) {
        
        size = size == null || size <= 0 ? 1000 : size;
        boolean state = true;
        long num = 0;

        do {
            BoolQueryBuilder builder = QueryBuilders.boolQuery();
            NativeSearchQuery query = new NativeSearchQuery(builder);
            query.addAggregation(AggregationBuilders.terms("mail").field("terms").field("mail.keyword").size(size).minDocCount(2));

            SearchHits<EsDto> searchHits = elasticsearchRestTemplate.search(query, EsDto.class);
            Terms aggs = searchHits.getAggregations().get("mail");

            if (aggs.getBuckets().size() == 0) {
                state = false;
            }

            for (Terms.Bucket entry : aggs.getBuckets()) {
                if (entry.getDocCount() == 1) {
                    state = false;
                    break;
                }

                String mail = (String) entry.getKey();
                log.info(" syncUp mail : {} - mailCount:{}", mail, entry.getDocCount());

                NativeSearchQuery queryMail = this.getQuery(0, size, mail);
                SearchHits<EsDto> mails = elasticsearchRestTemplate.search(queryMail, EsDto.class);
                if (mails.getTotalHits() <= 1) {
                    state = false;
                    break;
                }
                List<EsDto> collect = mails.get().map(e -> e.getContent()).collect(Collectors.toList());

                Collection<Integer> ids = collect.stream().skip(1).map(EsDto::getId).collect(Collectors.toSet());
                if (ids.size() == 0) {
                    continue;
                }
                num += ids.size();
                int synchroId = collect.get(0).getId();
                esManager.synchronizationUpByBach(ids, synchroId);
            }
        } while (state);
        
        log.info(" syncUp mail end num: {}  ", num);
    }
    
    
    private NativeSearchQuery getQuery(int page, int pageSize, String mail) {
        BoolQueryBuilder builder = QueryBuilders.boolQuery();
        EsQueryUtil.setMatchPhraseQueryValue("mail.keyword", mail, builder);
        
        NativeSearchQuery query = new NativeSearchQuery(builder);
        List<Sort.Order> list = new ArrayList<>();
        list.add(new Sort.Order(Sort.Direction.ASC, "id"));
        Pageable pageable = PageRequest.of(page, pageSize, Sort.by(list));
        query.setPageable(pageable);
        query.setTrackTotalHits(true);
        
        return query;
    }    

manager:

  @Async
    public void synchronizationUpByBach(Collection<Integer> ids, int newId) {
        if (ids.size() == 0 || newId == 0) {
            return;
        }
        List<Es> entities = new ArrayList<>();
        for (Integer id : ids) {
            Es es = new Es();
            es.setId(id);
            entities.add(es);
        }
        log.info(" synchronizationUp success old:{} - new:{} ", StringUtils.join(ids,","), newId);
        deleteAll(entities);
    }
  public void deleteAll(List<Es> entities) {
        try {
            iEsDao.deleteAll(entities);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        
    }

domain:

# indexName 为es索引
@Document(indexName = "res")
public class Es {
    @Id
    private int id;
    
    private int status;
    private String name;
    private String mail;
}

执行效果:
es去重 获取重复数据后保留最小值ID后进行批量删除

其实本身更多的删除数量的,只是陆陆续续做测试,进行调整修改,最后执行加了num总数,执行最后处理了71072条的重复数据,下面的是总共删除的重复数量,删除文档:289351,为了避免处理过慢,可以将controller的执行步骤移到manage里面进行异步处理。

es去重 获取重复数据后保留最小值ID后进行批量删除