MongoDB 聚合管道(Aggregation Pipeline)

管道概念

POSIX多线程的使用方式中, 有一种很重要的方式-----流水线(亦称为“管道”)方式,“数据元素”流串行地被一组线程按顺序执行。它的使用架构可参考下图:mongodb

clip_image002

以面向对象的思想去理解,整个流水线,能够理解为一个数据传输的管道;该管道中的每个工做线程,能够理解为一个整个流水线的一个工做阶段stage,这些工做线程之间的合做是一环扣一环的。靠输入口越近的工做线程,是时序较早的工做阶段stage,它的工做成果会影响下一个工做线程阶段(stage)的工做结果,即下个阶段依赖于上一个阶段的输出,上一个阶段的输出成为本阶段的输入。这也是pipeline的一个共有特色!shell

为了回应用户对简单数据访问的需求,MongoDB2.2版本引入新的功能聚合框架(Aggregation Framework) ,它是数据聚合的一个新框架,其概念相似于数据处理的管道。 每一个文档经过一个由多个节点组成的管道,每一个节点有本身特殊的功能(分组、过滤等),文档通过管道处理后,最后输出相应的结果。管道基本的功能有两个:数据库

一是对文档进行“过滤”,也就是筛选出符合条件的文档;express

二是对文档进行“变换”,也就是改变文档的输出形式。json

其余的一些功能还包括按照某个指定的字段分组和排序等。并且在每一个阶段还可使用表达式操做符计算平均值和拼接字符串等相关操做。管道提供了一个MapReduce 的替代方案,MapReduce使用相对来讲比较复杂,而管道的拥有固定的接口(操做符表达),使用比较简单,对于大多数的聚合任务管道通常来讲是首选方法。数组

该框架使用声明性管道符号来支持相似于SQL Group By操做的功能,而再也不须要用户编写自定义的JavaScript例程。多线程

大部分管道操做会在“aggregate”子句后会跟上“$match”打头。它们用在一块儿,就相似于SQL的from和where子句,或是MongoDB的find函数。“$project”子句看起来也很是相似SQL或MongoDB中的某个概念(和SQL不一样的是,它位于表达式尾端)。架构

接下来介绍的操做在MongoDB聚合框架中是独一无二的。与大多数关系数据库不一样,MongoDB天生就能够在行/文档内存储数组。尽管该特性对于全有全无的数据访问十分便利,可是它对于须要组合投影、分组和过滤操做来编写报告的工做,却显得至关复杂。“$unwind”子句将数组分解为单个的元素,并与文档的其他部分一同返回。框架

“$group”操做与SQL的Group By子句用途相同,可是使用起来却更像是LINQ中的分组运算符。与取回一行平面数据不一样,“$group”操做的结果集会呈现为一个持续的嵌套结构。正因如此,使用“$group”能够返回聚合信息,例如对于每一个分组中的实际文档,计算文档总体或部分的数目和平均值。ide

管道操做符

管道是由一个个功能节点组成的,这些节点用管道操做符来进行表示。聚合管道以一个集合中的全部文档做为开始,而后这些文档从一个操做节点 流向下一个节点 ,每一个操做节点对文档作相应的操做。这些操做可能会建立新的文档或者过滤掉一些不符合条件的文档,在管道中能够对文档进行重复操做。

先看一个管道聚合的例子:

clip_image004

管道操做符的种类:

Name

Description

$project

Reshapes a document stream. $project can rename, add, or remove fields as well as create computed values and sub-documents.

$match

Filters the document stream, and only allows matching documents to pass into the next pipeline stage.$match uses standard MongoDB queries.

$limit

Restricts the number of documents in an aggregation pipeline.

$skip

Skips over a specified number of documents from the pipeline and returns the rest.

$unwind

Takes an array of documents and returns them as a stream of documents.

$group

Groups documents together for the purpose of calculating aggregate values based on a collection of documents.

$sort

Takes all input documents and returns them in a stream of sorted documents.

$geoNear

Returns an ordered stream of documents based on proximity to a geospatial point.

管道操做符详细使用说明

  1.  $project: 数据投影,主要用于重命名、增长和删除字段

例如:

db.article.aggregate(

{ $project : {

title : 1 ,

author : 1 ,

}}

);

这样的话结果中就只还有_id,tilte和author三个字段了,默认状况下_id字段是被包含的,若是要想不包含_id话能够这样:

db.article.aggregate(

{ $project : {

_id : 0 ,

title : 1 ,

author : 1

}});

也能够在$project内使用算术类型表达式操做符,例如:

db.article.aggregate(

{ $project : {

title : 1,

doctoredPageViews : { $add:["$pageViews", 10] }

}});

经过使用$add给pageViews字段的值加10,而后将结果赋值给一个新的字段:doctoredPageViews

注:必须将$add计算表达式放到中括号里面

除此以外使用$project还能够重命名字段名和子文档的字段名:

db.article.aggregate(

{ $project : {

title : 1 ,

page_views : "$pageViews" ,

bar : "$other.foo"

}});

也能够添加子文档:

db.article.aggregate(

{ $project : {

title : 1 ,

stats : {

pv : "$pageViews",

foo : "$other.foo",

dpv : { $add:["$pageViews", 10] }

}

}});

产生了一个子文档stats,里面包含pv,foo,dpv三个字段。

2.$match: 滤波操做,筛选符合条件文档,做为下一阶段的输入

   $match的语法和查询表达式(db.collection.find())的语法相同

db.articles.aggregate( [

{ $match : { score : { $gt : 70, $lte : 90 } } },

{ $group: { _id: null, count: { $sum: 1 } } }

] );

   $match用于获取分数大于70小于或等于90记录,而后将符合条件的记录送到下一阶段$group管道操做符进行处理。

注意:1.不能在$match操做符中使用$where表达式操做符。

          2.$match尽可能出如今管道的前面,这样能够提前过滤文档,加快聚合速度。

          3.若是$match出如今最前面的话,可使用索引来加快查询。

3.  $limit:  限制通过管道的文档数量

     $limit的参数只能是一个正整数

db.article.aggregate(

{ $limit : 5 });

这样的话通过$limit管道操做符处理后,管道内就只剩下前5个文档了

4. $skip: 从待操做集合开始的位置跳过文档的数目

    $skip参数也只能为一个正整数

db.article.aggregate(

{ $skip : 5 });

通过$skip管道操做符处理后,前五个文档被“过滤”掉

5.$unwind:将数组元素拆分为独立字段

例如:article文档中有一个名字为tags数组字段:

> db.article.find()
  { "_id" : ObjectId("528751b0e7f3eea3d1412ce2"),

"author" : "Jone", "title" : "Abook",

"tags" : [  "good",  "fun",  "good" ] }

使用$unwind操做符后:

> db.article.aggregate({$project:{author:1,title:1,tags:1}},{$unwind:"$tags"})
{
        "result" : [
                {
                        "_id" : ObjectId("528751b0e7f3eea3d1412ce2"),
                        "author" : "Jone",
                        "title" : "A book",
"tags" : "good"
                },
                {
                        "_id" : ObjectId("528751b0e7f3eea3d1412ce2"),
                        "author" : "Jone",
                        "title" : "A book",
"tags" : "fun"
                },
                {
                        "_id" : ObjectId("528751b0e7f3eea3d1412ce2"),
                        "author" : "Jone",
                        "title" : "A book",
  "tags" : "good"
                }
        ],
        "ok" : 1
}

注意:a.{$unwind:"$tags"})不要忘了$符号

          b.若是$unwind目标字段不存在的话,那么该文档将被忽略过滤掉,例如:

     > db.article.aggregate({$project:{author:1,title:1,tags:1}},{$unwind:"$tag"})
    { "result" : [ ], "ok" : 1 }
将$tags改成$tag因不存在该字段,该文档被忽略,输出的结果为空

        c.若是$unwind目标字段不是一个数组的话,将会产生错误,例如:

  > db.article.aggregate({$project:{author:1,title:1,tags:1}},{$unwind:"$title"})

    Error: Printing Stack Trace
    at printStackTrace (src/mongo/shell/utils.js:37:15)
    at DBCollection.aggregate (src/mongo/shell/collection.js:897:9)
    at (shell):1:12
    Sat Nov 16 19:16:54.488 JavaScript execution failed: aggregate failed: {
        "errmsg" : "exception: $unwind:  value at end of field path must be an array",
        "code" : 15978,
        "ok" : 0
} at src/mongo/shell/collection.js:L898

      d.若是$unwind目标字段数组为空的话,该文档也将会被忽略。

  6.$group 对数据进行分组

    $group的时候必需要指定一个_id域,同时也能够包含一些算术类型的表达式操做符:

db.article.aggregate(

{ $group : {

_id : "$author",

docsPerAuthor : { $sum : 1 },

viewsPerAuthor : { $sum : "$pageViews" }

}});

注意:  1.$group的输出是无序的。

          2.$group操做目前是在内存中进行的,因此不能用它来对大量个数的文档进行分组。

7.$sort : 对文档按照指定字段排序

使用方式以下:

db.users.aggregate( { $sort : { age : -1, posts: 1 } });

按照年龄进行降序操做,按照posts进行升序操做

注意:1.若是将$sort放到管道前面的话能够利用索引,提升效率

        2.MongoDB 24.对内存作了优化,在管道中若是$sort出如今$limit以前的话,$sort只会对前$limit个文档进行操做,这样在内存中也只会保留前$limit个文档,从而能够极大的节省内存

        3.$sort操做是在内存中进行的,若是其占有的内存超过物理内存的10%,程序会产生错误

8.$goNear

        $goNear会返回一些坐标值,这些值以按照距离指定点距离由近到远进行排序

具体使用参数见下表:

Field

Type

Description

near

GeoJSON point orlegacy coordinate pairs

The point for which to find the closest documents.

distanceField

string

The output field that contains the calculated distance. To specify a field within a subdocument, use dot notation.

limit

number

Optional. The maximum number of documents to return. The default value is 100. See also the num option.

num

number

Optional. The num option provides the same function as the limitoption. Both define the maximum number of documents to return. If both options are included, the num value overrides the limit value.

maxDistance

number

Optional. A distance from the center point. Specify the distance in radians. MongoDB limits the results to those documents that fall within the specified distance from the center point.

query

document

Optional. Limits the results to the documents that match the query. The query syntax is the usual MongoDB read operation query syntax.

spherical

Boolean

Optional. If true, MongoDB references points using a spherical surface. The default value is false.

distanceMultiplier

number

Optional. The factor to multiply all distances returned by the query. For example, use the distanceMultiplier to convert radians, as returned by a spherical query, to kilometers by multiplying by the radius of the Earth.

includeLocs

string

Optional. This specifies the output field that identifies the location used to calculate the distance. This option is useful when a location field contains multiple locations. To specify a field within a subdocument, usedot notation.

uniqueDocs

Boolean

Optional. If this value is true, the query returns a matching document once, even if more than one of the document’s location fields match the query. If this value is false, the query returns a document multiple times if the document has multiple matching location fields. See $uniqueDocsfor more information.

例如:

db.places.aggregate([

{

$geoNear: {

near: [40.724, -73.997],

distanceField: "dist.calculated",

maxDistance: 0.008,

query: { type: "public" },

includeLocs: "dist.location",

uniqueDocs: true,

num: 5

}

}

])

其结果为:

{

"result" : [

{ "_id" : 7,

"name" : "Washington Square",

"type" : "public",

"location" : [

[ 40.731, -73.999 ],

[ 40.732, -73.998 ],

[ 40.730, -73.995 ],

[ 40.729, -73.996 ]

],

"dist" : {

"calculated" : 0.0050990195135962296,

"location" : [ 40.729, -73.996 ]

}

},

{ "_id" : 8,

"name" : "Sara D. Roosevelt Park",

"type" : "public",

"location" : [

[ 40.723, -73.991 ],

[ 40.723, -73.990 ],

[ 40.715, -73.994 ],

[ 40.715, -73.994 ]

],

"dist" : {

"calculated" : 0.006082762530298062,

"location" : [ 40.723, -73.991 ]

}

}

],

"ok" : 1}

其中,dist.calculated中包含了计算的结果,而dist.location中包含了计算距离时实际用到的坐标

注意: 1.使用$goNear只能在管道处理的开始第一个阶段进行

         2.必须指定distanceField,该字段用来决定是否包含距离字段

3.$gonNear和geoNear命令比较类似,可是也有一些不一样:distanceField在$geoNear中是必选的,而在geoNear中是可选的;includeLocs在$geoNear中是string类型,而在geoNear中是boolen类型。

管道表达式

管道操做符做为“键”,所对应的“值”叫作管道表达式。例如上面例子中{$match:{status:"A"}},$match称为管道操做符,而{status:"A"}称为管道表达式,它能够看做是管道操做符的操做数(Operand),每一个管道表达式是一个文档结构,它是由字段名、字段值、和一些表达式操做符组成的,例如上面例子中管道表达式就包含了一个表达式操做符$sum进行累加求和。

每一个管道表达式只能做用于处理当前正在处理的文档,而不能进行跨文档的操做。管道表达式对文档的处理都是在内存中进行的。除了可以进行累加计算的管道表达式外,其余的表达式都是无状态的,也就是不会保留上下文的信息。累加性质的表达式操做符一般和$group操做符一块儿使用,来统计该组内最大值、最小值等,例如上面的例子中咱们在$group管道操做符中使用了具备累加的$sum来计算总和。

除了$sum觉得,还有如下性质的表达式操做符:

组聚合操做符

Name

Description

$addToSet

Returns an array of all the unique values for the selected field among for each document in that group.

$first

Returns the first value in a group.

$last

Returns the last value in a group.

$max

Returns the highest value in a group.

$min

Returns the lowest value in a group.

$avg

Returns an average of all the values in a group.

$push

Returns an array of all values for the selected field among for each document in that group.

$sum

Returns the sum of all the values in a group.

Bool类型聚合操做符

Name

Description

$and

Returns true only when all values in its input array are true.

$or

Returns true when any value in its input array are true.

$not

Returns the boolean value that is the opposite of the input value.

比较类型聚合操做符

Name

Description

$cmp

Compares two values and returns the result of the comparison as an integer.

$eq

Takes two values and returns true if the values are equivalent.

$gt

Takes two values and returns true if the first is larger than the second.

$gte

Takes two values and returns true if the first is larger than or equal to the second.

$lt

Takes two values and returns true if the second value is larger than the first.

$lte

Takes two values and returns true if the second value is larger than or equal to the first.

$ne

Takes two values and returns true if the values are not equivalent.

算术类型聚合操做符

Name

Description

$add

Computes the sum of an array of numbers.

$divide

Takes two numbers and divides the first number by the second.

$mod

Takes two numbers and calcualtes the modulo of the first number divided by the second.

$multiply

Computes the product of an array of numbers.

$subtract

Takes two numbers and subtracts the second number from the first.

字符串类型聚合操做符

Name

Description

$concat

Concatenates two strings.

$strcasecmp

Compares two strings and returns an integer that reflects the comparison.

$substr

Takes a string and returns portion of that string.

$toLower

Converts a string to lowercase.

$toUpper

Converts a string to uppercase.

日期类型聚合操做符

Name

Description

$dayOfYear

Converts a date to a number between 1 and 366.

$dayOfMonth

Converts a date to a number between 1 and 31.

$dayOfWeek

Converts a date to a number between 1 and 7.

$year

Converts a date to the full year.

$month

Converts a date into a number between 1 and 12.

$week

Converts a date into a number between 0 and 53

$hour

Converts a date into a number between 0 and 23.

$minute

Converts a date into a number between 0 and 59.

$second

Converts a date into a number between 0 and 59. May be 60 to account for leap seconds.

$millisecond

Returns the millisecond portion of a date as an integer between 0 and 999.

条件类型聚合操做符

Name

Description

$cond

A ternary operator that evaluates one expression, and depending on the result returns the value of one following expressions.

$ifNull

Evaluates an expression and returns a value.

注:以上操做符都必须在管道操做符的表达式内来使用。

各个表达式操做符的具体使用方式参见:

http://docs.mongodb.org/manual/reference/operator/aggregation-group/

聚合管道的优化

   1.$sort  +  $skip  +  $limit顺序优化

若是在执行管道聚合时,若是$sort、$skip、$limit依次出现的话,例如:

{ $sort: { age : -1 } },

{ $skip: 10 },

{ $limit: 5 }

那么实际执行的顺序为:

{ $sort: { age : -1 } },

{ $limit: 15 },

{ $skip: 10 }

$limit会提早到$skip前面去执行。

此时$limit = 优化前$skip+优化前$limit

这样作的好处有两个:1.在通过$limit管道后,管道内的文档数量个数会“提早”减少,这样会节省内存,提升内存利用效率。2.$limit提早后,$sort紧邻$limit这样的话,当进行$sort的时候当获得前“$limit”个文档的时候就会中止。

2.$limit + $skip + $limit + $skip Sequence Optimization

若是聚合管道内反复出现下面的聚合序列:

  { $limit: 100 },

  { $skip: 5 },

  { $limit: 10},

  { $skip: 2 }

首先进行局部优化为:能够按照上面所讲的先将第二个$limit提早:

{ $limit: 100 },

  { $limit: 15},

  { $skip: 5 },

  { $skip: 2 }

进一步优化:两个$limit能够直接取最小值 ,两个$skip能够直接相加:

{ $limit: 15 },

  { $skip: 7 }

3.Projection Optimization

过早的使用$project投影,设置须要使用的字段,去掉不用的字段,能够大大减小内存。除此以外也能够过早使用

咱们也应该过早使用$match、$limit、$skip操做符,他们能够提早减小管道内文档数量,减小内存占用,提供聚合效率。

除此以外,$match尽可能放到聚合的第一个阶段,若是这样的话$match至关于一个按条件查询的语句,这样的话可使用索引,加快查询效率。

聚合管道的限制

    1.类型限制

在管道内不能操做 Symbol, MinKey, MaxKey, DBRef, Code, CodeWScope类型的数据( 2.4版本解除了对二进制数据的限制).

     2.结果大小限制

管道线的输出结果不能超过BSON 文档的大小(16M),若是超出的话会产生错误.

     3.内存限制

若是一个管道操做符在执行的过程当中所占有的内存超过系统内存容量的10%的时候,会产生一个错误。

当$sort和$group操做符执行的时候,整个输入都会被加载到内存中,若是这些占有内存超过系统内存的%5的时候,会将一个warning记录到日志文件。一样,所占有的内存超过系统内存容量的10%的时候,会产生一个错误。

分片上使用聚合管道

聚合管道支持在已分片的集合上进行聚合操做。当分片集合上进行聚合操纵的时候,聚合管道被分为两成两个部分,分别在mongod实例和mongos上进行操做。

聚合管道使用

首先下载测试数据:http://media.mongodb.org/zips.json 并导入到数据库中。

1.查询各州的人口数

var connectionString = ConfigurationManager.AppSettings["MongodbConnection"];

var client = new MongoClient(connectionString);

var DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];

string collName = ConfigurationManager.AppSettings["collName"];

MongoServer mongoDBConn = client.GetServer();

MongoDatabase db = mongoDBConn.GetDatabase(DatabaseName);

MongoCollection<BsonDocument> table = db[collName];

var group = new BsonDocument

{

{"$group", new BsonDocument

{

{

"_id","$state"

},

{

"totalPop", new BsonDocument

{

{ "$sum","$pop" }

}

}

}

}

};

var sort = new BsonDocument

{

{"$sort", new BsonDocument{ { "_id",1 }}}

};

var pipeline = new[] { group, sort };

var result = table.Aggregate(pipeline);

var matchingExamples = result.ResultDocuments.Select(x => x.ToDynamic()).ToList();

foreach (var example in matchingExamples)

{

var message = string.Format("{0}- {1}", example["_id"], example["totalPop"]);

Console.WriteLine(message);

}

2.计算每一个州平均每一个城市打人口数

> db.zipcode.aggregate({$group:{_id:{state:"$state",city:"$city"},pop:{$sum:"$pop"}}},

                              {$group:{_id:"$_id.state",avCityPop:{$avg:"$pop"}}},

                                       {$sort:{_id:1}})

var group1 = new BsonDocument

{

{"$group", new BsonDocument

{

{

"_id",new BsonDocument

{

{"state","$state"},

{"city","$city"}

}

},

{

"pop", new BsonDocument

{

{ "$sum","$pop" }

}

}

}

}

};

var group2 = new BsonDocument

{

{"$group", new BsonDocument

{

{

"_id","$_id.state"

},

{

"avCityPop", new BsonDocument

{

{ "$avg","$pop" }

}

}

}

}

};

var pipeline1 = new[] { group1,group2, sort };

var result1 = table.Aggregate(pipeline1);

var matchingExamples1 = result1.ResultDocuments.Select(x => x.ToDynamic()).ToList();

foreach (var example in matchingExamples1)

{

var message = string.Format("{0}- {1}", example["_id"], example["avCityPop"]);

Console.WriteLine(message);

}

3.计算每一个州人口最多和最少的城市名字

>db.zipcode.aggregate({$group:{_id:{state:"$state",city:"$city"},pop:{$sum:"$pop"}}},

                                      {$sort:{pop:1}},

                                      {$group:{_id:"$_id.state",biggestCity:{$last:"$_id.city"},biggestPop:{$last:"$pop"},smallestCity:{$first:"$_id.city"},smallestPop:{$first:"$pop"}}},

                                      {$project:{_id:0,state:"$_id",biggestCity:{name:"$biggestCity",pop:"$biggestPop"},smallestCity:{name:"$smallestCity",pop:"$smallestPop"}}})

var sort1 = new BsonDocument

{

{"$sort", new BsonDocument{ { "pop",1 }}}

};

var group3 = new BsonDocument

{

{

"$group", new BsonDocument

{

{

"_id","$_id.state"

},

{

"biggestCity",new BsonDocument

{

{"$last","$_id.city"}

}

},

{

"biggestPop",new BsonDocument

{

{"$last","$pop"}

}

},

{

"smallestCity",new BsonDocument

{

{"$first","$_id.city"}

}

},

{

"smallestPop",new BsonDocument

{

{"$first","$pop"}

}

}

}

}

};

var project = new BsonDocument

{

{

"$project", new BsonDocument

{

{"_id",0},

{"state","$_id"},

{"biggestCity",new BsonDocument

{

{"name","$biggestCity"},

{"pop","$biggestPop"}

}},

{"smallestCity",new BsonDocument

{

{"name","$smallestCity"},

{"pop","$smallestPop"}

}

}

}

}

};

var pipeline2 = new[] { group1,sort1 ,group3, project };

var result2 = table.Aggregate(pipeline2);

var matchingExamples2 = result2.ResultDocuments.Select(x => x.ToDynamic()).ToList();

foreach (var example in matchingExamples2)

{

Console.WriteLine(example.ToString());

//var message = string.Format("{0}- {1}", example["_id"], example["avCityPop"]);

//Console.WriteLine(message);

}

总结

对于大多数的聚合操做,聚合管道能够提供很好的性能和一致的接口,使用起来比较简单, 和MapReduce同样,它也能够做用于分片集合,可是输出的结果只能保留在一个文档中,要遵照BSON Document大小限制(当前是16M)。

管道对数据的类型和结果的大小会有一些限制,对于一些简单的固定的汇集操做可使用管道,可是对于一些复杂的、大量数据集的聚合任务仍是使用MapReduce。

相关文章:

http://mikaelkoskinen.net/mongodb-aggregation-framework-examples-in-c/

相关文章
相关标签/搜索