玩过 Hadoop 的小伙伴对 MapReduce 应该不陌生,MapReduce 的强大且灵活,它可以将一个大问题拆分为多个小问题,将各个小问题发送到不同的机器上去处理,所有的机器都完成计算后,再将计算结果合并为一个完整的解决方案,这就是所谓的分布式计算。本文就来看看 MongoDB 中MapReduce 的使用。

MapReduce

MongoDB 中的 MapReduce 可以用来实现更复杂的聚合命令,使用 MapReduce 主要实现两个函数:map 函数和 reduce 函数,map 函数用来生成键值对序列,map 函数的结果作为 reduce 函数的参数,reduce 函数中再做进一步的统计。比如数据集如下:

{ "_id" : ObjectId("61a87ff752b05e1cb67f7e55"), "name" : "鲁迅", "book" : "呐喊", "price" : 38, "publisher" : "人民文学出版社" }
{ "_id" : ObjectId("61a8800c52b05e1cb67f7e56"), "name" : "曹雪芹", "book" : "红楼梦", "price" : 22, "publisher" : "人民文学出版社" }
{ "_id" : ObjectId("61a8802552b05e1cb67f7e57"), "name" : "钱钟书", "book" : "宋诗选注", "price" : 99, "publisher" : "人民文学出版社" }
{ "_id" : ObjectId("61a8804552b05e1cb67f7e58"), "name" : "钱钟书", "book" : "谈艺录", "price" : 66, "publisher" : "三联书店" }
{ "_id" : ObjectId("61a8805b52b05e1cb67f7e59"), "name" : "鲁迅", "book" : "彷徨", "price" : 55, "publisher" : "花城出版社" }

假如我想查询每位作者所出的书的总价,操作如下:

var map = function(){emit(this.name, this.price)}
var reduce = function(key,value){return Array.sum(value)}
var options = {out:"totalPrice"}
db.mp_1.mapReduce(map,reduce,options)
db.totalPrice.find()

emit 函数主要用来实现分组,接收两个参数,第一个参数表示分组的字段,第二个参数表示要统计的数据。reduce 函数来做具体的数据处理操作,接收两个参数,对应 emit 函数的两个参数。这里使用了 Array 中的 sum 函数对 price 字段进行自加处理。options 中定义了将结果输出的集合,届时我们将在这个集合中去查询数据。默认情况下,这个集合即使在数据库重启后也会保留,并且保留集合中的数据。查询结果如下:

{
    "_id":"钱钟书",
    "value":165
}
{
    "_id":"曹雪芹",
    "value":22
}
{
    "_id":"鲁迅",
    "value":93
}

再比如我想查询每位作者出了几本书,如下:

var map = function(){emit(this.name,1)}
var reduce = function(key,value){return Array.sum(value)}
var options = {out:"book_num"}
db.mp_1.mapReduce(map,reduce,options)
db.book_num.find()

查询结果如下:

{
    "_id":"钱钟书",
    "value":2
}
{
    "_id":"曹雪芹",
    "value":1
}
{
    "_id":"鲁迅",
    "value":2
}

将每位作者的书列出来,如下:

var map = function(){emit(this.name,this.book)}
var reduce = function(key,value){return value.join(',')}
var options = {out:"book_title"}
db.mp_1.mapReduce(map,reduce,options)
db.book_title.find()

结果如下:

{
    "_id":"鲁迅",
    "value":"彷徨,呐喊"
}
{
    "_id":"曹雪芹",
    "value":"红楼梦"
}
{
    "_id":"钱钟书",
    "value":"谈艺录,宋诗选注"
}

比如查询每位作者售价在40以上的书,如下:

var map = function(){emit(this.name,this.book)}
var reduce = function(key,value){return value.join(',')}
var options = {query:{price:{$gt:40}},out:"book_gt40"}
db.mp_1.mapReduce(map,reduce,options)
db.book_gt40.find()

query 表示对查到的集合再进行筛选。

结果如下:

{
    "_id":"钱钟书",
    "value":"谈艺录,宋诗选注"
}
{
    "_id":"鲁迅",
    "value":"彷徨"
}

runCommand

我们也可以利用 runCommand 命令来执行 MapReduce,格式如下:

db.runCommand({
    mapReduce: <collection>,
    map: <function>,
    reduce: <function>,
    finalize: <function>,
    out: <output>,
    query: <document>,
    sort: <document>,
    limit: <number>,
    scope: <document>,
    jsMode: <boolean>,
    verbose: <boolean>,
    bypassDocumentValidation: <boolean>,
    collation: <document>
})

含义如下:

参数 含义
mapReduce 要操作的集合
map map函数
reduce reduce函数
finalize 最终处理函数
out 输出的集合
query 对结果进行过滤
sort 对结果排序
limit 限制返回的结果数
scope 设置参数值,在这里设置的值在map、reduce、finalize函数中可见
jsMode 是否将map执行的中间数据由JS对象转换成BSON对象,默认为false
verbose 是否显示详细的时间统计信息
bypassDocumentValidation 是否绕过文档验证
collation 其他一些校对

如下操作,表示执行 MapReduce 操作并对统计的集合限制返回条数。限制返回条数之后再进行统计操作。

var map = function(){emit(this.name,this.book)}
var reduce = function(key,value){return value.join(',')}
db.runCommand({mapReduce:'mp_1',map,reduce,out:"book_title_2",limit:4,verbose:true})
db.book_title_2.find()

执行结果如下:

{
    "_id":"鲁迅",
    "value":"呐喊"
}
{
    "_id":"曹雪芹",
    "value":"红楼梦"
}
{
    "_id":"钱钟书",
    "value":"谈艺录,宋诗选注"
}

可以看到,鲁迅有一本书不见了,就是因为 limit 先限制集合返回条数,然后再执行统计操作。

finalize 操作表示最终处理函数,如下:

var f1 = function(key,reduceValue){var obj={};obj.author=key;obj.books=reduceValue;return obj;}
var map = function(){emit(this.name,this.book)}
var reduce = function(key,value){return value.join(',')}
db.runCommand({mapReduce:'mp_1',map,reduce,out:"book_title_3",finalize:f1})
db.book_title_3.find()

f1 第一个参数 key 表示 emit 中的第一个参数,第二个参数表示 reduce 的执行结果,我们可以在 f1 中对这个结果进行再处理。结果如下:

{
    "_id":"鲁迅",
    "value":{
        "author":"鲁迅",
        "books":"彷徨,呐喊"
    }
}
{
    "_id":"曹雪芹",
    "value":{
        "author":"曹雪芹",
        "books":"红楼梦"
    }
}
{
    "_id":"钱钟书",
    "value":{
        "author":"钱钟书",
        "books":"谈艺录,宋诗选注"
    }
}

scope 则用来定义在 map、reduce、finalize 中都可见的变量,如下:

var f1 = function(key,reduceValue){var obj={};obj.author=key;obj.books=reduceValue;obj.str=str;return obj;}
var map = function(){emit(this.name,this.book)}
var reduce = function(key,value){return value.join(',')}
db.runCommand({mapReduce:'mp_1',map,reduce,out:"book_title_scope",finalize:f1,scope:{str:"hello"}})
db.book_title_scope.find()

执行结果如下:

{
    "_id":"曹雪芹",
    "value":{
        "author":"曹雪芹",
        "books":"红楼梦",
        "str":"hello"
    }
}
{
    "_id":"鲁迅",
    "value":{
        "author":"鲁迅",
        "books":"彷徨,呐喊",
        "str":"hello"
    }
}
{
    "_id":"钱钟书",
    "value":{
        "author":"钱钟书",
        "books":"谈艺录,宋诗选注",
        "str":"hello"
    }
}