玩过 Hadoop 的小伙伴对 MapReduce 应该不陌生,MapReduce 的强大且灵活,它可以将一个大问题拆分为多个小问题,将各个小问题发送到不同的机器上去处理,所有的机器都完成计算后,再将计算结果合并为一个完整的解决方案,这就是所谓的分布式计算。本文就来看看 MongoDB 中MapReduce 的使用。
MapReduce
MongoDB 中的 MapReduce 可以用来实现更复杂的聚合命令,使用 MapReduce 主要实现两个函数:map 函数和 reduce 函数,map 函数用来生成键值对序列,map 函数的结果作为 reduce 函数的参数,reduce 函数中再做进一步的统计。比如数据集如下:
{ "_id" : ObjectId("61a87ff752b05e1cb67f7e55"), "name" : "鲁迅", "book" : "呐喊", "price" : 38, "publisher" : "人民文学出版社" }
{ "_id" : ObjectId("61a8800c52b05e1cb67f7e56"), "name" : "曹雪芹", "book" : "红楼梦", "price" : 22, "publisher" : "人民文学出版社" }
{ "_id" : ObjectId("61a8802552b05e1cb67f7e57"), "name" : "钱钟书", "book" : "宋诗选注", "price" : 99, "publisher" : "人民文学出版社" }
{ "_id" : ObjectId("61a8804552b05e1cb67f7e58"), "name" : "钱钟书", "book" : "谈艺录", "price" : 66, "publisher" : "三联书店" }
{ "_id" : ObjectId("61a8805b52b05e1cb67f7e59"), "name" : "鲁迅", "book" : "彷徨", "price" : 55, "publisher" : "花城出版社" }
假如我想查询每位作者所出的书的总价,操作如下:
var map = function(){emit(this.name, this.price)}
var reduce = function(key,value){return Array.sum(value)}
var options = {out:"totalPrice"}
db.mp_1.mapReduce(map,reduce,options)
db.totalPrice.find()
emit 函数主要用来实现分组,接收两个参数,第一个参数表示分组的字段,第二个参数表示要统计的数据。reduce 函数来做具体的数据处理操作,接收两个参数,对应 emit 函数的两个参数。这里使用了 Array 中的 sum 函数对 price 字段进行自加处理。options 中定义了将结果输出的集合,届时我们将在这个集合中去查询数据。默认情况下,这个集合即使在数据库重启后也会保留,并且保留集合中的数据。查询结果如下:
{
"_id":"钱钟书",
"value":165
}
{
"_id":"曹雪芹",
"value":22
}
{
"_id":"鲁迅",
"value":93
}
再比如我想查询每位作者出了几本书,如下:
var map = function(){emit(this.name,1)}
var reduce = function(key,value){return Array.sum(value)}
var options = {out:"book_num"}
db.mp_1.mapReduce(map,reduce,options)
db.book_num.find()
查询结果如下:
{
"_id":"钱钟书",
"value":2
}
{
"_id":"曹雪芹",
"value":1
}
{
"_id":"鲁迅",
"value":2
}
将每位作者的书列出来,如下:
var map = function(){emit(this.name,this.book)}
var reduce = function(key,value){return value.join(',')}
var options = {out:"book_title"}
db.mp_1.mapReduce(map,reduce,options)
db.book_title.find()
结果如下:
{
"_id":"鲁迅",
"value":"彷徨,呐喊"
}
{
"_id":"曹雪芹",
"value":"红楼梦"
}
{
"_id":"钱钟书",
"value":"谈艺录,宋诗选注"
}
比如查询每位作者售价在40以上的书,如下:
var map = function(){emit(this.name,this.book)}
var reduce = function(key,value){return value.join(',')}
var options = {query:{price:{$gt:40}},out:"book_gt40"}
db.mp_1.mapReduce(map,reduce,options)
db.book_gt40.find()
query 表示对查到的集合再进行筛选。
结果如下:
{
"_id":"钱钟书",
"value":"谈艺录,宋诗选注"
}
{
"_id":"鲁迅",
"value":"彷徨"
}
runCommand
我们也可以利用 runCommand 命令来执行 MapReduce,格式如下:
db.runCommand({
mapReduce: <collection>,
map: <function>,
reduce: <function>,
finalize: <function>,
out: <output>,
query: <document>,
sort: <document>,
limit: <number>,
scope: <document>,
jsMode: <boolean>,
verbose: <boolean>,
bypassDocumentValidation: <boolean>,
collation: <document>
})
含义如下:
参数 | 含义 |
mapReduce | 要操作的集合 |
map | map函数 |
reduce | reduce函数 |
finalize | 最终处理函数 |
out | 输出的集合 |
query | 对结果进行过滤 |
sort | 对结果排序 |
limit | 限制返回的结果数 |
scope | 设置参数值,在这里设置的值在map、reduce、finalize函数中可见 |
jsMode | 是否将map执行的中间数据由JS对象转换成BSON对象,默认为false |
verbose | 是否显示详细的时间统计信息 |
bypassDocumentValidation | 是否绕过文档验证 |
collation | 其他一些校对 |
如下操作,表示执行 MapReduce 操作并对统计的集合限制返回条数。限制返回条数之后再进行统计操作。
var map = function(){emit(this.name,this.book)}
var reduce = function(key,value){return value.join(',')}
db.runCommand({mapReduce:'mp_1',map,reduce,out:"book_title_2",limit:4,verbose:true})
db.book_title_2.find()
执行结果如下:
{
"_id":"鲁迅",
"value":"呐喊"
}
{
"_id":"曹雪芹",
"value":"红楼梦"
}
{
"_id":"钱钟书",
"value":"谈艺录,宋诗选注"
}
可以看到,鲁迅有一本书不见了,就是因为 limit 先限制集合返回条数,然后再执行统计操作。
finalize 操作表示最终处理函数,如下:
var f1 = function(key,reduceValue){var obj={};obj.author=key;obj.books=reduceValue;return obj;}
var map = function(){emit(this.name,this.book)}
var reduce = function(key,value){return value.join(',')}
db.runCommand({mapReduce:'mp_1',map,reduce,out:"book_title_3",finalize:f1})
db.book_title_3.find()
f1 第一个参数 key 表示 emit 中的第一个参数,第二个参数表示 reduce 的执行结果,我们可以在 f1 中对这个结果进行再处理。结果如下:
{
"_id":"鲁迅",
"value":{
"author":"鲁迅",
"books":"彷徨,呐喊"
}
}
{
"_id":"曹雪芹",
"value":{
"author":"曹雪芹",
"books":"红楼梦"
}
}
{
"_id":"钱钟书",
"value":{
"author":"钱钟书",
"books":"谈艺录,宋诗选注"
}
}
scope 则用来定义在 map、reduce、finalize 中都可见的变量,如下:
var f1 = function(key,reduceValue){var obj={};obj.author=key;obj.books=reduceValue;obj.str=str;return obj;}
var map = function(){emit(this.name,this.book)}
var reduce = function(key,value){return value.join(',')}
db.runCommand({mapReduce:'mp_1',map,reduce,out:"book_title_scope",finalize:f1,scope:{str:"hello"}})
db.book_title_scope.find()
执行结果如下:
{
"_id":"曹雪芹",
"value":{
"author":"曹雪芹",
"books":"红楼梦",
"str":"hello"
}
}
{
"_id":"鲁迅",
"value":{
"author":"鲁迅",
"books":"彷徨,呐喊",
"str":"hello"
}
}
{
"_id":"钱钟书",
"value":{
"author":"钱钟书",
"books":"谈艺录,宋诗选注",
"str":"hello"
}
}