欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

「Mongo」聚合操作与清洗重复数据项

程序员文章站 2022-04-15 16:33:59
...

「Mongo」聚合操作与清洗重复数据项

使用Mongo聚合操作来进行重复的数据项清洗,并使用PyMongo加入到数据清洗组件中。

当前环境:PyMongo 3.6.1 / MongoDB 3.4.7 / Python 3.6.4 :: Anaconda, Inc.

在爬虫中断续爬时会出现少量数据重复的问题,我将数据去重放在了数据清洗环节,清洗的过程中顺带将重复的数据删除。
Mongo老版本的解决方案是建立单一索引,Mongo3.+可以使用聚合操作将重复的数据检索出来并进行删除。
元数据结构如下:

item = { 
    "_id" : ObjectId("..."), 
    "title" : "...",     # 数据标题
    "date" : "...",      # 数据日期
    "url" : "...",       # 数据来源
    "content" : "...", 
    "source" : "..."
    "category" : "...", 
    ...
}

需要根据「相同标题+相同日期+相同来源」判定数据重复,在管道中根据这三项条件分组(group)>1(match)出来,最后遍历删除(db.collections.remove())

聚合操作的过程
group:使title/date/urlid+1dupsidmatch: 在$group得到的分组基础上匹配数量>1的项
「Mongo」聚合操作与清洗重复数据项
Mongo Shell 查询重复数据的操作如下:

db.test.aggregate([
    {
        $group: { _id: {'title': '$title','date':'$date','url': '$url'},count: {$sum: 1},dups: {$addToSet: '$_id'}}
    },
    {
        $match: {count: {$gt: 1}}
    }
])

Mongo Shell 将查询到的结果删除操作:

db.test.aggregate([
    ...                                     // 同上聚合操作,此处略
]).forEach(function(doc){
    doc.dups.shift();                       // 去除重复组的第一个元数据_id,得到除第一个之外的其他元组
    db.test.remove({_id: {$in: doc.dups}}); // remove()删除这些重复的数据
})

PyMongo 操作代码如下:
使用bulk_write()进行批量删除

pipeline = [
    {
        '$group': {
            '_id': {'title': '$title', 'date': '$date', 'url': '$url'},
            'count': {'$sum': 1},
            'dups': {
                '$addToSet': '$_id'
            }
        },
    },
    {
        '$match': {
            'count': {
                '$gt': 1
            }
        }
    }
]

map_id = map(lambda doc: doc['dups'][1:], db['data_value'].aggregate(pipeline=pipeline))
list_id = [item for sublist in map_id for item in sublist]
print(db['data_value'] \
      .bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), list_id))) \
      .bulk_api_result)

「Mongo」聚合操作与清洗重复数据项

一行代码鬼畜版:

print(db['data_value'].bulk_write(list(map(lambda _id: DeleteOne({'_id': _id}), [item for sublist in map(lambda doc: doc['dups'][1:], db['data_value'].aggregate(pipeline=[{'$group': {'_id': {'title': '$title', 'date': '$date', 'url': '$url'},'count': {'$sum': 1},'dups': {'$addToSet': '$_id'}},},{'$match': {'count': {'$gt': 1}}}])) for item in sublist]))).bulk_api_result)