欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MongoDB聚合

程序员文章站 2024-01-22 22:56:22
...

MongoDB聚合操作用于对数据的批量操作,将集合按条件分组后在进行一系列操作,诸如求和、求均值等。聚合操作能对集合进行复杂的操作,主要用于数理统计和数据挖掘。MongoDB中聚合操作的输入是i集合中的文档,输出可以是一个文档,也可是多条文档。MongoDB提供非常强大的聚合操作,可分为三种方式

  • 聚合管道(Aggregation Pipeline)
  • 单目聚合操作(Single Purpose Aggregation Operation)
  • MapReduce 编程模型

聚合管道

POSIX多线程使用方式中,有种叫做管道(流水线)的方式,其数据元素流串行地被一组线程按顺序执行。

聚合管道由阶段(Stage)组成,文档在一个阶段处理完毕后,聚合管道会把处理结果传到下一个阶段。聚合管的功能

  • 对文档进行过滤,查询出符合条件的文档。
  • 对文档进行变换,改变文档的输出形式。

聚合管道的每个阶段使用阶段操作符(Stage Operators)定义,在每个阶段操作符中可使用表达式操作符(Expression Operators)计算总和、均值、拼接或分割字符串等操作,直到每个阶段完结最后返回结果,返回的结果可直接输出,也可存储到集合中。

MongoDB聚合
聚合管道到的用法

处理流程

  • db.collection.aggregate() 可同时使用多个管道,方便数据处理。
  • db.collection.aggregate() 使用 MongoDB 内置原生操作,聚合效率高且支持类似SQL中GroupBy的操作,不在需要用户编写自定义的JS例程。
  • 每个阶段管道限制100M的内存,若单节点管道超出极限,MongoDB产生错误。为了能够处理大型数据集,可设置 allowDiskUsetrue 为聚合管道节点把数据写入临时文件,以解决100M内存限制。
  • db.collection.aggregate() 可作用于分片集合,但结果不能输在分片集合,MapReduce可作用于在分片集合其结果也可输在分片集合中。
  • db.collection.aggregate() 返回一个指针(cursor),数据存放在内存中可直接操作,跟MongoShell一样。
  • db.collection.aggregate() 输出的结果只能保存在文档中,BSON Document大小限制为16M。

语法解析

db.collection.aggregate(pipeline, options)
db.collection.aggregate([{<stage>,...}], ...)

pipeline 参数

  • $project 对输入文档添加新字段或删除现有字段,可自定义显示哪些字段。
  • $match 根据条件过滤仅输出符合条件的文档,若放在pipeline前面,根据条件过滤数据并传输到写一个阶段管道,可提高后续数据处理效率。也可放在out之前用以对结果再一次过滤。
  • $redact 字段所处的document结构的级别
  • $limit 用来限制MongoDB聚合管道返回的文档数量
  • $skip 在聚合管道中跳过指定数量的文档并返回剩余的文档
  • $unwind 将文档中某个数组类型字段拆分成多条,每条包含数组中的一个值。
  • $sample 随机选择从起输入指定数量的文档,若大于或等于5%的collection的文档,$sample进行收集扫描并排序随后选择顶部文件。因此$sample在收集阶段是受排序的内存限制。
  • $sort 将输入文档排序后输出
  • $geoNear 用于地理位置数据分析
  • $out 必须为pipeline最后一个阶段管道,将最后计算结果写入到指定collection中。
  • $indexStats 返回数据集合的每个索引使用情况
  • $group 将集合中的文档分组,可用于统计结果,$group 首先将数据根据 key 进行分组。

$match

筛选条件,过滤不满足条件的文档,可使用常规查询操作符。

db.users.aggregate( {$match: {'age':{$gte:18}} } )

$project

  • 用于包含、排除字段,设置需查询或过滤的字段,0为过滤掉字段不显示,1为需查询的字段。
  • 用于对字段重命名
  • 投射中可使用表达式
db.users.aggregate(
  {$match: {age: {$gte: 18}}},
  {$project: {_id:0, username:1, created_at:1}}
)

// $project 当字段值为0或1时用于过滤字段,当键名为一个自定义的字符串,键值为$紧跟原字段表示要对该字段进行重命名。
db.users.aggregate(
  {$match: {age: {$gte: 18}}},
  {$project: {_id:0, username:1, nickname:$username }}
)

// 通过修改字段名达到生成字段副本,以便后续操作符使用。
db.users.aggregate(
  { $match: {age: {$gte: 18 } } },
  { $project: {id:$_id, username:1 } }
)

算术表达式可对数值运算

db.users.aggregate(
  // 对 score 字段值加1后作为 scores 的值
  { $project: {scores: { $add : [$score,$score,1]  } } },
)
db.users.aggregate(
  // $subtract:[exp1, exp2] 数组中第一个元素减去第二个元素
  { $project: {scores: { $subtract: [$score,1]  } } },
)
db.users.aggregate(
  // $multiply:[exp1, exp2] 数组中多个元素相乘
  { $project: {scores: { $multiply: [$score, 2, 5]  } } },
)
db.users.aggregate(
  // $divide:[exp1, exp2] 数组中第一个元素除以第二个元素
  { $project: {scores: { $divide: [$score, 2, 5]  } } },
)
db.users.aggregate(
  // $mod:[exp1, exp2] 数组中第一个元素除以第二个元素的余数
  { $project: {scores: { $mod: [$score, 2]  } } },
)

字符串操作

db.users.aggregate(
  // $substr:[exp, startOffset, numToReturn] 字符串截取
  { $project: {nickname: { $substr: [$nickname, 2, 6]  } } },
)

db.users.aggregate(
  // $concat:[exp1, exp2, exp3...] 字符串拼接,将数组中多个元素拼接在一起
  { $project: {fullname: { $concat: [$firstname, $lastname]  } } },
)

db.users.aggregate(
  // $toLower:exp 字符串转为小写
  { $project: {nickname: { $toLower: $username } } },
)

db.users.aggregate(
  // $toUpper:exp 字符串转为大写
  { $project: {nickname: { $toUpper: $username } } },
)

为所有文档新增字段

db.users.update({}, {$set, {publish_at:new Date()} }, true, true)

日期表达式

db.users.aggregate(
  { $project: { 
      'year': { $year: $created_at },
      'month': { $month: $created_at }, 
      'dayOfMonth': { $dayOfMonth: $created_at }, 
      'dayOfWeek': { $dayOfWeek: $created_at }, 
      'dayOfYear': { $dayOfYear: $created_at },  
      'hour': { $hour: $created_at }, 
      'minute': { $minute: $created_at }, 
      'second': { $second: $created_at }, 
    } 
  }
)

时间间隔(秒数)

db.users.aggregate(
  { 
    $project: { 
      'fasttime': {
         $subtract: [ { $second: new Date() }, { $second: $created_at } ]
        } 
     } 
  }
)

字符串比较

db.users.aggregate(
  // $cmp:[exp1, exp2] 字符串比较,相同为0,小于返回负数, 大于返回正数
  { $project: { result : { $cmp: [ $age, 18 ] } } }
)
db.users.aggregate(
  // $strcasecmp:[exp1, exp2] 字符串比较,相同为0,小于返回-1, 大于返回1
  { $project: { result : { $strcasecmp: [ $username, 'junchow' ] } } }
)

逻辑条件

db.users.aggregate(
  // $eq 判断表达式是否相等
  { $project: { result : {$eq: [$username, 'junchow' ] } } }
)

db.users.aggregate(
  // $and [exp1, exp2...expn] 连接多条件,所有条件为真则表达式为真
  { $project: { result: { $and : [ {$eq : [$username:'junchow']}, {$gt:[$age : 18]} ] } }  }
)

db.users.aggregate({
  // $not exp 用于取反操作
  $project: {result: {$not:{$eq:[$username, 'junchow']}} }
})

db.users.aggregate({
  // $cond:[booleanExp, trueExp, falseExp] 三目运算符
  $project : { result: {$cond: [{$eq:[$username:'junchow']}, true, false] } }
})

db.users.aggregate({
  // $ifNull:[exp, replacementExpr] 若条件为null则返回表达式值,若字段不存在时字段值为null
  $project: { result: { $ifNull : [ $notExistField, 'not exist is null' ] } }
})

$group

$group分组使用_id指定要分组的键名,用来自定义字段统计。

db.users.aggregate({
  $match : { age: { $gte : 18 } }
},{
  $group : { _id:$username, count:{$sum:1} }
});

// 多字段分组
db.users.aggregate({
  $match: {age: {$gte:18}  }},
  $group: {_id:{username:$username, age:$ge}, 'count':{$sum:1} }        
})

// $sum:val 对每个文档加val求和
// $avg:val 对每个文档求均值
db.users.aggregate({
  $group: { _id:$username, count:{$avg:$age} }
})

db.users.aggregate({
  $group: { _id:$username, count:{$max:$age}  }
})

db.users.aggregate({
  $group: {_id:$username, count:{$min:$age} }
})

// $first:val 获取分组中首个
db.users.aggregate({
  $group:{_id:$username, count:{$first: $age} }
})
db.users.aggregate({
  $group:{_id:$username, count:{$last: $age} }
})
db.users.aggregate({
  $group: {_id:$username, count:{$addToSet: $age} }
})
db.users.aggregate({
  $group:{_id:$username, count:{$push: $age} }
})

聚合运算

group

先选定分组所依据的键,而后将集合依据选定键值的不同分成若干组。然后可通过聚合每一组内的文档,产生一个结果文档。

group不支持分片集群,无法进行分布式运算(shard cluster)。若需要支持分布式需使用aggregatemapReduce

db.collection.group(document)

{
  # 分组字段
  key:{key1, key2:1},
  # 查询条件
  cond:{},
  # 聚合函数
  reduce:function(current, result){},
  # 初始化
  initial:{},
  # 统计一组后的回调函数
  finalize:function(){}
}

计算每个栏目下商品个数

SELECT COUNT(*) FROM goods GROUP BY category_id;

db.goods.group({
  key:{category_id:1},
  cond:{},//所有
  reduce:function(current, result){//current对应当前行,result对应分组中的多行
    result.total += 1;
  },
  initial:{total:0}
})

查看每个栏目下商品价格大于100的数量

SELECT category,goods_name FROM goods WHERE 1=1 AND price>100 GROUP BY category_id

db.goods.group({
  key:{category_id:1},
  cond:{price:{$gt:100}},
  reduce:function(current,result){
    result.count += 1;
  },
  initial:{count:0}
})

计算每个栏目下商品库存量

SELECT category_id,SUM(store) FROM goods WHERE 1=1 AND GROUP BY category_id

db.goods.group({
  key:{category_id:1},
  cond:{},
  initial:{sum:0},
  reduce:function(current,result){
    result += current.store;
  }
});

获取每个栏目下最贵的商品价格

SELECT catetory_id,MAX(price) FROM goods GROUP BY category_id

db.goods.group({
  key:{category_id:1},
  cond:{},
  initial:{max:0},
  reduce:function(current,result){
    if(current.price > result.max){
      result.max = current.price;
    }
  }
});

查询每个栏目下商品的平均价格

SELECT category_id,AVERAGE(price) FROM goods GROUP BY category_id

db.goods.group({
  key:{category_id:1},
  cond:{},
  reduce:function(current,result){
    result.total += current.price;
    result.count  += 1;
  },
  initial:{total:0, count:0},//进组result
  finalize:functioin(result){//出组 result
    result.average = result.total/result.count;
  }
})

aggregate

aggregate聚合框架与sql对比

  • $match WHERE
  • $group GROUP BY
  • $project SELECT
  • $sort ORDER BY
  • $limit LIMIT
  • $sum SUM()
  • $sum COUNT()

查询每个栏目下商品数量

SELECT COUNT(*) FROM goods GROUP BY category_id

db.goods.aggregate([
  {$group:{_id:'$category_id', count:{$sum:1}}},
  {$project:{_id:0, category_id:'$category_id', count:'$count'}},
  {$sort:{count:1}}
])

查询每个栏目下价格大于100的商品个数

SELECT COUNT(*) FROM goods WHERE 1=1 AND price>100 GROUP BY category_id

db.goods.aggregate([
  {$match:{price:{$gt:100}}},
  {$group:{_id:'$category_id'}, count:{$sum:1}},
  {$project:{_id:0, category_id:'$_id.category_id', count:'$count'}},
  {$sort:{count:1}}
])

查询每个栏目下价格大于100的商品个数,仅显示个数大于3的。

SELECT category_id,COUNT(*) AS count WHERE 1=1 AND price>100 GROUP BY category_id HAVING count>3

db.goods.aggregate([
  {$match:{price:{$gt:100}}},
  {$group:{_id:'$category_id', count:{$sum:1}}},
  {$match:{count:{$gt:3}}},
  {$project:{_id:0, category_id:'$_id.category_id', count:'$count'}},
  {$sort:{count:1}}
])

查看每个栏目下商品的库存量

SELECT category_id,SUM(store) WHERE 1=1 GROUP BY category_id

db.goods.aggregate([
  {$group:{_id:'$category_id', count:{$sum:1}, total:{$sum:'$store'}}},
  {$project:{_id:0, category_id:'$_id.category_id', count:'$count', total:'$total'}},
  {$sort:{total:1}}
])

查询每个栏目下商品的平均价格并升序排序

SELECT category_id,AVG(price) AS avg FROM goods GROUP BY category_id ORDER BY avg ASC

db.goods.aggregate([
  {$group:{_id:'$category_id', count:{$sum:1}, average:{$avg:'$price'}}},
  {$sort:average:-1}
])

mapReduce

mapReduce是一个轻松并行化到多台服务器的聚合方法,它会拆分问题,再将各个部分发送到不同机器上,让每台机器都完成一部分。当所有机器都完成后,再将结果汇集起来形成最终完整的结果。

mapReduce最开始是映射(map),将操作映射到集合中的每个文档。这个操作要么无作为,要么产生一些键和x个值。接着进入中间环节(洗牌shuffle),按照分组并将产生的键值组成列表放到对应的键中。化简(reduce)则把列表中的值化简成一个单值。这个值被返回,然后接着进行洗牌,直到每个键的列表只有一个值为止,这个值就是最后的结果。

mapReduce的代价是速度,group不是很快,mapReduce更慢,绝不要用在实时环境中,要作为后台任务运行,将创建一个保存结果的结合,可对这个集合进行实时查询。

mapReduce的工作过程

  • map 映射
    现将同一个组的数据映射到一个文档(数组)上,在映射环节想要得到文档中每个键,map()使用emit()返回要处理的值。edit()会给mapReduce一个键和一个值,键类似group所使用键key。
  • reduce 归约
    将数组(同一组)数据进行运算,reduce()由两个参数,一个是key也就是emit()返回的第一个值,另一个是数组,由一个或多个对应于键的文档构成。reduce()一定要能够被反复调用,不论是映射环节还是前一个简化环节。

mapReduce语法

db.runCommand({
  mapreduce:字符串,集合名
  map:函数,
  reduce:函数,
  [query:文档,发往map()前先给过渡的文档],
  [sort:文档,发往map()前先给文档排序],
  [limit:整数,发往map()的文档数量上限],
  [out:字符串,统计结果保存的集合],
  [keeptemp:布尔值,链接关闭时临时结果集合是否保存],
  [finalize:函数,将reduce的结果发给此函数做最后处理],
  [scope:文档,js代码中要用到的变量],
  //jsMode=true时 BSON>JS>map>reduce>BSON
  //jsMode=false时 BSON>JS>map>BSON>JS>reduce>BSON,可处理非常大的mapreduce
  [jsMode:布尔值,是否减少执行过程中BSON和JS的转换,默认为true],
  [verbose:布尔值,是否产生更加详细的服务器日期,默认为true]
})
MongoDB聚合
mapReduce

MongoDB没有模式,所以并不晓得每个文档由多少个键,通常找到集合的所有键的最好方法就是用MapReduce。

查询结合中所有键

var map = function(){
  for(var k in this){ //this当前映射文档的引用
    emit(k, {count:1});//将文档某个键的计数返回
  }
}
var reduce = function(key,emits){
  var total = 0;
  for(var k in emits){

    }
}

计算每个栏目下商品库存总量

SELECT category_id,SUM(store) AS sum FROM goods GROUP BY category_id

var map = function(){
  emit(this.category_id, this.store);//获得栏目下商品的库存量
};
var reduce = function(key,store){
  return Array.sum(store);
}
db.goods.mapReduce(map,reduce,query,{out:'result'});

查询每个栏目下商品的平均价格

var map = function(){
  emit(this.category_id, this.price);
}
var reduce = function(category_id,price){
  return Array.avg(price);
}
db.goods.mapReduce(map, reduce, {out:'result'})

将MongoDB组成的shard分片集群把地震数据分布到各节点上,将中国区域按10个经度10个维度为一组约30块,并用mapReduce计算地震数据,统计每组上每月的地震次数及地震级别。分析出结果把地震高发区用偏红色标注,低发区偏绿标注。

上一篇: MongoDB聚合

下一篇: MongoDB入门教程