本文的介绍的聚合操作包括2种,Aggregation Pipeline、Map-Reduce,基于mongo 4.4版本。
Aggregation Pipeline
Aggregation Pipeline又称之为管道聚合,聚合管道由一个或多个处理文档的阶段(stages)组成,MongoDB提供了db.collection.aggregate() 方法和运行聚合管道的聚合命令。
- 每个阶段对输入文档执行一个操作。例如,一个阶段可以过滤文档、分组文档和计算值
- 从一个阶段输出的文档将输入到下一个阶段
- 聚合管道可以返回文档组的结果,例如:返回总值、平均值、最大值和最小值
Aggregation Pipeline示例
创建以下包含产品订单的集合:
db.orders.insertMany( [
{ _id: 0, productName: "Steel beam", status: "new", quantity: 10 },
{ _id: 1, productName: "Steel beam", status: "urgent", quantity: 20 },
{ _id: 2, productName: "Steel beam", status: "urgent", quantity: 30 },
{ _id: 3, productName: "Iron rod", status: "new", quantity: 15 },
{ _id: 4, productName: "Iron rod", status: "urgent", quantity: 50 },
{ _id: 5, productName: "Iron rod", status: "urgent", quantity: 10 }
] )
- 一个stage(match),相当于find加条件的过滤
db.orders.aggregate([ {$match:{status:"urgent"}} ])
$match stage:
- 将文档过滤到状态为”urgent”的文档
- 将过滤后的文档输出到 $group 阶段
- 增加第2个stage(group),在match的基础上对productName进行group,每个group统计quantity的总数
db.orders.aggregate([ {$match:{status:"urgent"}}, {$group: { _id: "$productName", sumQuantity: { $sum: "$quantity" } } } ])
$group stage:
- 承接match的stage,按产品名称对输入文档进行分组
- 使用 $sum 计算每个productName组的总数量,该总数量存储在聚合管道返回的sumQuantity字段中。
聚合管道阶段
聚合管道由一个或多个处理文档的阶段组成:
- 每个阶段都会在文档通过管道时对其进行转换。
- 同一阶段可以在管道中多次出现,但以下阶段例外:$out、$merge 和 $geoNear。
db.collection.aggregate()阶段
语法:
db.collection.aggregate( [ { <stage> }, ... ] )
除了 $out、$merge 和 $geoNear 阶段之外的所有阶段都可以在管道中出现多次。
$count
$count常用的包括以下几个:
- db.collection.countDocuments()
- $collStats
- db.collection.estimatedDocumentCount()
- count
- db.collection.count()
例如:匹配copies小于等于2的文档,并且统计这些文档的数量
db.books.aggregate([ { $match:{ copies:{$lte:2} } }, { $count:"lte_2_copies_count" } ])
$match
过滤出条件匹配的文档传递到下一个管道阶段,语法:
{ $match: { <query> } }
$match条件尽可能早地将放在聚合管道中,因为 $match 限制了聚合管道中的文档总数,较早的 $match 操作将管道的处理量降至最低。如果在管道的最开始 放置 $match,则查询可以像任何其他 db.collection.find() 或 db.collection.findOne() 一样利用索引。 例如:
//_id:null match管道所有的当做一组 db.books.aggregate( [ { $match: { $or: [ { copies: { $gt: 1, $lt: 5 } }, { title: "abc123" } ] } }, { $group: {_id:null,count:{$sum:1}}} ] );
$sort
对所有输入文档进行排序并按排序顺序将它们返回到管道,$sort 接受一个文档,该文档指定要排序的字段和相应的排序顺序。语法:
{ $sort: { <field1>: <sort order>, <field2>: <sort order> ... } }
排序的field,最多支持对32个键进行排序。 具有相同排序值的文档,在多次执行中可能执行结果排序不一致。 例子:
db.books.aggregate( [
{ $match: { $or: [ { copies: { $gt: 1, $lt: 5 } }, { title: "abc123" } ] } },
{ $sort: {title:1,_id:-1}}
] );
$unwind
从输入文档中解构一个数组字段以输出每个元素的文档。语法:
{ $unwind: <field path> }
以前,如果字段路径指定的字段中的值不是数组,则 db.collection.aggregate() 会产生错误,而3.2版本之后则不会报错了 例子:
- 例如查询_id为1,sizes数组的展开列表:
数据插入:
db.inventory.insertOne({ "_id" : 1, "item" : "ABC1", sizes: [ "S", "M", "L"] })
db.inventory.aggregate([ { $match:{_id:1} }, { $unwind:"$sizes" } ])
- 包括数组索引(includeArrayIndex)
数据插入:
db.inventory2.insertMany([ { "_id" : 1, "item" : "ABC", price: NumberDecimal("80"), "sizes": [ "S", "M", "L"] }, { "_id" : 2, "item" : "EFG", price: NumberDecimal("120"), "sizes" : [ ] }, { "_id" : 3, "item" : "IJK", price: NumberDecimal("160"), "sizes": "M" }, { "_id" : 4, "item" : "LMN" , price: NumberDecimal("10") }, { "_id" : 5, "item" : "XYZ", price: NumberDecimal("5.75"), "sizes" : null } ])
db.inventory2.aggregate( [ { $unwind: { path: "$sizes", includeArrayIndex: "arrayIndex" } }])
如果该sizes字段不是一个数组,则为null,如_id=3的sizes=M
- 保留null和空数组(preserveNullAndEmptyArrays)
以下 $unwind 操作使用 preserveNullAndEmptyArrays 选项来包含 size 字段为 null、缺失或空数组的文档。
db.inventory2.aggregate( [ { $unwind: { path: "$sizes", preserveNullAndEmptyArrays: true } } ] )
- 通过展开的字段进行分组
db.sales.aggregate([ // First Stage { $match:{_id:{$in:["1","2"]}} }, // Second Stage { $unwind:"$items" }, // Third Stage { $unwind:"$items.tags" }, //Four Stage { $group:{ _id:"$items.tags", totalSalesAmount:{ $sum:{$multiply:["$items.price","$items.quantity"]} } } } ])
5.
$project
将带有请求字段的文档传递到管道中的下一个阶段,指定的字段可以是输入文档中的现有字段或新计算的字段。
{ $project: { <specification(s)> } }
默认情况下,_id 字段包含在输出文档中。要从输出文档中排除 _id 字段,您必须在 $project 中明确指定抑制 _id 字段。 如果您指定排除一个或多个字段,则在输出文档中返回所有其他字段,如:
{ $project: { "<field1>": 0, "<field2>": 0, ... } } // Return all but the specified fields
例子:
- 查询status为”urgent”,只返回productName,quantity字段,不返回id字段
db.orders.aggregate([ {$match:{status:"urgent"}}, {$project:{productName:1,quantity:1,_id:0}} ])
- 从输出文档中排除字段productName
db.orders.aggregate([ {$project:{productName:0}} ])
- 有条件地排除字段
下面的$project阶段使用REMOVE变量来排除author.middle字段,前提是它等于 “”
插入文档:
{ "_id" : 1, title: "abc123", isbn: "0001122223334", author: { last: "zzz", first: "aaa" }, copies: 5, lastModified: "2016-07-28" } { "_id" : 2, title: "Baked Goods", isbn: "9999999999999", author: { last: "xyz", first: "abc", middle: "" }, copies: 2, lastModified: "2017-07-21" } { "_id" : 3, title: "Ice Cream Cakes", isbn: "8888888888888", author: { last: "xyz", first: "abc", middle: "mmm" }, copies: 5, lastModified: "2017-07-22" }
db.books.aggregate([ { $project:{ "title":1, "author.first": 1, "author.last" : 1, "author.middle": { $cond:{ if:{$eq:["","$author.middle"]}, then: "$$REMOVE", else: "$author.middle" } } } } ])
- 包括计算字段
以下 $project 阶段添加了新字段 isbn、lastName 和 copiesSold:
db.books.aggregate( [ { $project: { title: 1, isbn: { prefix: { $substr: [ "$isbn", 0, 3 ] }, group: { $substr: [ "$isbn", 3, 2 ] }, publisher: { $substr: [ "$isbn", 5, 4 ] }, title: { $substr: [ "$isbn", 9, 3 ] }, checkDigit: { $substr: [ "$isbn", 12, 1] } }, lastName: "$author.last", copiesSold: "$copies" } } ] )
- 投影新数组字段
将已有的字段,投影到一个新字段上输出结果,如将author.last,author.first两个字段投影到一个新数组上输出
db.books.aggregate( [ { $project: { myArray: [ "$author.last", "$author.first" ] } } ] )
$addFields
$lookup
对同一数据库中的未分片集合执行左外连接,以过滤来自“已连接”集合的文档以进行处理。对于每个输入文档,$lookup 阶段添加一个新的数组字段, 其元素是“加入”集合中的匹配文档。$lookup 阶段将这些重新调整的文档传递到下一个阶段。
- 等价匹配(Equality Match)
{ $lookup: { //指定同一数据库中的集合以执行连接。 from 集合不能被分片 from: <collection to join>, //指定从文档输入到 $lookup 阶段的字段, localField: <field from the input documents>, foreignField: <field from the documents of the "from" collection>, as: <output array field> } }
用SQL来理解就是:
SELECT *, <output array field> FROM collection WHERE <output array field> IN (SELECT * FROM <collection to join> WHERE <foreignField>= <collection.localField>);
更多操作参考:聚合管道
管道聚合行为
mongo中聚合命令对单个集合进行操作,逻辑上将整个集合传递到聚合管道中,要优化操作,请尽可能使用以下策略来避免扫描整个集合。MongoDB的查询计划器 分析聚合管道以确定是否可以使用索引来提高管道性能。例如,以下管道阶段可以利用索引:
- $match: 如果$match 阶段发生在管道的开头,它可以使用索引来过滤文档。
- $sort:$sort 阶段可以使用索引,只要它前面没有 $project、$unwind 或 $group 阶段。
- $group:如果满足以下所有条件,$group 阶段有时可以使用索引来查找每个组中的第一个文档:
- $group 阶段之前有一个 $sort 阶段,用于对要分组的字段进行排序
- 分组字段上有一个与排序顺序匹配的索引,并且$group 阶段使用的唯一累加器是 $first。
单一目的聚合操作
单一目的的聚合操作包括:
- db.collection.estimatedDocumentCount()
- db.collection.count()
- db.collection.distinct()
单一用途的聚合方法很简单,但缺乏聚合管道的功能。
map-reduce
聚合管道比 map-reduce 操作提供更好的性能和可用性,Map-reduce 操作可以使用聚合管道操作符重写,例如 $group、$merge 等.对于需要自定义功能 的map-reduce操作,MongoDB 从版本 4.4 开始提供 $accumulator 和 $function 聚合运算符。使用这些运算符在 JavaScript 中定义自定义聚合表达式。