2011. 9. 22. 18:36

[mongodb] Aggregation (MapReduce)

mongodb는 몇개의 집계(aggregation) 도구를 지원한다.

* count

document 개수를 전달한다.

db.a.count()
0
db.a.insert({"a":1})
db.a.count()
1

query 개수도 전달한다.

db.a.count({"a":1})
1

* distinct

주어진 key에 대해 구별되는 값을 찾아준다.
collection과 key를 지정해야 한다.

db.c.insert({"a":1})
db.c.insert({"a":1})
db.c.insert({"a":2})
db.c.insert({"a":3})
db.runCommand({"distinct":"c", "key":"a"})
{
 "values" : [
  1,
  2,
  3
 ],
 "stats" : {
  "n" : 4,
  "nscanned" : 4,
  "nscannedObjects" : 4,
  "timems" : 0
 },
 "ok" : 1
}

* group

(참고)
http://www.mongodb.org/display/DOCS/Aggregation
에 따르면, group의 결과는 single BSON object이기 때문에,
크기의 제약이 따른다. 즉 작아야 한다. (10,000 keys 이하).
제한없이 group하고자 한다면, MapReduce를 사용해야 한다.

group은 보다 복잡한 집계를 가능케한다. SQL의 GROUP BY와 비슷하다.

예를들어, 부서별 연봉 합을 구하는 경우,
SELECT dep_id,SUM(salary) FROM c GROUP BY dep_id;
와 같이 SQL로 할 수 있다.

group 명령을 이용하면, 위와 같은 SQL 구문을 구현할 수 있다.

아래와 같은 document가 있다고 가정하자.

{"dep_id":1, "salary":1}
{"dep_id":1, "salary":2}
{"dep_id":1, "salary":3}
{"dep_id":2, "salary":10}
{"dep_id":2, "salary":12}
{"dep_id":2, "salary":16}
{"dep_id":3, "salary":4}
{"dep_id":3, "salary":1}

db.c.group(
  {key:{"dep_id":true},
  reduce:function(obj,prev){prev.sum_salary += obj.salary;},
  initial:{sum_salary:0}
});
[
 {
  "dep_id" : 1,
  "sum_salary" : 6
 },
 {
  "dep_id" : 2,
  "sum_salary" : 38
 },
 {
  "dep_id" : 3,
  "sum_salary" : 5
 }
]

아래와 같이 key에 :true가 없는 경우는, key 값으로 분류되진 않는다.
db.c.group(
  {key:"dep_id",
  reduce:function(obj,prev){prev.sum_salary += obj.salary;},
  initial:{sum_salary:0}
});
[ { "sum_salary" : 49 } ]

아래와 같이 condition을 통해 filter할 수 있다.
db.c.group(
  {key:{"dep_id":true},
  reduce:function(obj,prev){prev.sum_salary += obj.salary;},
  initial:{sum_salary:0},
  condition:{"dep_id":{"$gt":2}}
});
[ { "dep_id" : 3, "sum_salary" : 5 } ]

이러한 group은 아래와 같이 helper 없이도 호출된다.

db.runCommand({"group":{
  "ns":"c",
  "key":{"dep_id":true},
  "initial":{"sum_salary":0},
  "$reduce":function(obj,prev){prev.sum_salary += obj.salary;},
  "condition":{"dep_id":{"$gt":1}}
}});
{
 "retval" : [
  {
   "dep_id" : 2,
   "sum_salary" : 38
  },
  {
   "dep_id" : 3,
   "sum_salary" : 5
  }
 ],
 "count" : 5,
 "keys" : 2,
 "ok" : 1
}

"group" 명령 설명은 다음과 같다.

"ns":"c"
 ; collection 이름

"key":"dep_id"
  ; document를 group 시킬 key 지정. 주어진 값의 "dep_id" key로 group된다.

"initial":{"sum_salary":0}
 ; 주어진 group으로 reduce 함수가 처음 호출될 때, 초기화 document를 지정한다.
   누산기(accumulator)로 전달된는데, 그것은 변경 가능한 값이다.
   여기에서 지정된 document 형태로 결과가 보고된다. (key와 함께)

"$reduce":function(doc,prev) {...}
 ; collection내의 각각의 document가 한번씩 호출된다.
   현재의 document와 accumulator document가 전달된다.
   결과는 group이다.
   각 group 별로 accumulator document가 있다.

"condition":{"a":{"$gt":4}}
 ; 해당 condition 조건에 부합하는 document만 group 된다.

finalizer 사용하기

db로 부터 사용자에게 전달할 data의 양을 줄이는데 쓰이는 중요한 도구로,
group 명령의 output을 단일 db 응답으로 받아야 할 필요가 있기 때문이다.

db.c.group(
  {key:{"dep_id":true},
  reduce:function(obj,prev){prev.sum_salary += obj.salary;prev.cnt++;},
  finalize:function(out){out.avg=out.sum_salary / out.cnt;},
  initial:{sum_salary:0,avg:0,cnt:0}
});
[
 {
  "dep_id" : 1,
  "sum_salary" : 6,
  "avg" : 2,
  "cnt" : 3
 },
 {
  "dep_id" : 2,
  "sum_salary" : 38,
  "avg" : 12.666666666666666,
  "cnt" : 3
 },
 {
  "dep_id" : 3,
  "sum_salary" : 5,
  "avg" : 2.5,
  "cnt" : 2
 }
]

와 같이 group별 호출시 마지막에 한번 호출이 됨을 확인할 수 있으며,
inital에 값이 아닌 document가 들어가는 경우, 결과 보고가 자칫
길어질 수 있는데, 이를 줄여주는데 사용할 수 있다.
(위 예는 해당 case는 아님)

key 처럼 함수 사용하기

group을 묶는 함수를 정의할 수 있다.
"$keyf":function(x) {return x.category.toLowerCase();},
와 같이하면, 대소문자 구별없이 group 된다.

* MapReduce 사용하기

위의 count,distinct,group등은 MapReduce로도 해결되며, 그 이상의 결과도 구할 수 있다.

이는 보통 multiple server 같은 병렬 처리 환경에서 사용된다.
문제(problem)를 쪼개고, 다른 machine에게 그 조각을 전달한뒤,
각각의 machine이 해당 문제의 조각을 해결(solution)하도록 한다.
그리고, 모두 종료되었다면, 모든 해결의 조각을 결합(merge)한다.

MapReduce는 다음과 같은 step을 가진다.

1) map 단계
; collection의 모든 document에 대해 map 명령 수행
  이것은 임의의 value 가지고있는 key를 발산(emit)하거나 혹은 아무것도 하지 않는다.

2) shuffle step
; 발산(emit)된 key와 value의 list는 각각의 key에 의해 생성된다.

3) reduce step
; value 값 list를 가지고, 단일 element로 축소한다(reduce).
이 element는 단일 value만 남을때까지 shuffle step으로 되돌아간다.
그 단일 value가 결과이다.

MapReduce 사용의 비용은 속도이다.
group은 특별한 경우에 빠르지 않으나, MapReduce는 더 느리며,
"real time"으로 고려되지 않는다.
그래서, MapReduce는 background로 진행되며, 결과 collection을 생성하고,
그것을 real time으로 query한다.

Example 1 : collection의 모든 key 찾기

아래와 같이 map 함수 지정

map=function(){
 for (var key in this) {
  emit(key, {count:1});
}};

아래와 같이 reduce 함수 지정

reduce=function(key,emits) {
 total = 0;
 for (var i in emits) {
  total += emits[i].count;
 }
 return {"count":total};
}

만일 아래와 같은 상황이라 가정.

db.a.find()                                                     
{ "_id" : ObjectId("4e7ad0321b8ab0744823c8f8"), "a" : 1 }
{ "_id" : ObjectId("4e7ad0341b8ab0744823c8f9"), "b" : 1 }
{ "_id" : ObjectId("4e7ad0351b8ab0744823c8fa"), "b" : 1 }
{ "_id" : ObjectId("4e7ad0381b8ab0744823c8fb"), "a" : 1 }

mr = db.runCommand({"mapreduce":"a", "map":map, "reduce":reduce})
{
 "assertion" : "'out' has to be a string or an object",
 "assertionCode" : 13606,
 "errmsg" : "db assertion failure",
 "ok" : 0
}
와 같은 오류 발생한다면, mongodb 1.8+ 인 경우이다.
그때는 아래와 같이 out 옵션을 추가한다.

mr = db.runCommand({"mapreduce":"a", "map":map, "reduce":reduce, "out":{"inline":1}})
{
 "results" : [
  {
   "_id" : "_id",
   "value" : {
    "count" : 4
   }
  },
  {
   "_id" : "a",
   "value" : {
    "count" : 2
   }
  },
  {
   "_id" : "b",
   "value" : {
    "count" : 2
   }
  }
 ],
 "timeMillis" : 1,
 "counts" : {
  "input" : 4,
  "emit" : 8,
  "output" : 3
 },
 "ok" : 1
}

또한 다음과 같이 해도 된다.
db.a.mapReduce(map, reduce, {out:{inline:1}});                                      
{
 "results" : [
  {
   "_id" : "_id",
   "value" : {
    "count" : 4
   }
  },
  {
   "_id" : "a",
   "value" : {
    "count" : 2
   }
  },
  {
   "_id" : "b",
   "value" : {
    "count" : 2
   }
  }
 ],
 "timeMillis" : 1,
 "counts" : {
  "input" : 4,
  "emit" : 8,
  "output" : 3
 },
 "ok" : 1,
}

Example 2 : web page 분리하기

tag와 함께하는 link 제안 사이트에서, 가장 인기있는 topic 추출

map = function() {
 for (var i in this.tags) {
  var recency = 1/(new Date() - this.date);
  var socre = recency * this.score;

  emit(this.tags[i], {"urls":[this.url], "score":socre});

reduce = function(key,emits) {
 var total = {urls:[], score:0}
 for (var i in emits) {
  emits[i].urls.forEach(function(url) {
   total.urls.push(url);
  }
  total.score += emits[i].score;
 }
 return total;
}

Example 3 : tags의 item count 계산

db.things.insert( { _id : 1, tags : ['dog', 'cat'] } );
db.things.insert( { _id : 2, tags : ['cat'] } );
db.things.insert( { _id : 3, tags : ['mouse', 'cat', 'dog'] } );
db.things.insert( { _id : 4, tags : [] } );

m = function(){
 this.tags.forEach(
  function(z){
   emit( z , { count : 1 } );
  }
 );
};

r = function( key , values ){
 var total = 0;
 for ( var i=0; i<values.length; i++ )
  total += values[i].count;
  return { count : total };
};

res = db.things.mapReduce(m,r,out:{inline:1});
{
 "results" : [
  {
   "_id" : "cat",
   "value" : {
    "count" : 3
   }
  },
  {
   "_id" : "dog",
   "value" : {
    "count" : 2
   }
  },
  {
   "_id" : "mouse",
   "value" : {
    "count" : 1
   }
  }
 ],
 "timeMillis" : 1,
 "counts" : {
  "input" : 4,
  "emit" : 6,
  "output" : 3
 },
 "ok" : 1,
}

greenfish 註
(나름 MapReduce는 중요 개념이라, 제가 이해한 것을 공유합니다.)

mapReduce의 과정은 대략 다음과 같은 걸로 보인다.

map 함수 선언 -> reduce 함수 선언 -> mapReduce 실행

mapReduce 실행시 map 함수가 먼저 실행되고, reduce 함수가 실행된다.

{"a":1, "b":1}
{"a":2, "b":2}
{"a":3, "b":3}
...
이 있을때,

map은 모든 document에 대해 호출된다.
this는 현재의 document이다.

즉,
{"a":1, "b":1} <- this
{"a":2, "b":2} <- next this
{"a":3, "b":3} <- next next this
...

그러므로, map 함수에서 this는 document를 가리킨다.
map의 주요 목적은 emit을 호출하는 것이다.
emit을 호출하면 다른 영역에 document를 생성하는 것으로 이해하면 된다.
즉,
{"a":1, "b":1} -> this, emit {"sum":2}
과 같이 합을 emit할 수 있다. 물론, 해당 단계에서 emit을 여러번 호출할 수 있다.
그렇게 되면,
{"a":1, "b":1} -> {"sum":2}
{"a":2, "b":2} -> {"sum":4}
{"a":1, "b":3} -> {"sum":4}
...
과 같이 된다. 그렇다. 기것이 바로 map 이다~!

그리고, reduce는 emit된(즉, 다른 영역) document들을 받는다.
다시말해, reduce(key,emits)와 같이 parameter 2개를 받는다.
그런데 key parameter가 있다.
그러면, reduce는 emit에 들어온 document들의 key 개수만큼 호출받을 것이다.
그리고, emits는 for (i in emits) 하고 emits[i]와 같이 탐색이 가능하다.
reduce 함수는 return 한다.
그러면 input인 key와 documents를 value 하나로 전달하는 것이다~!!!

위와 같은 경우,

map 함수를
function() {
 emit("sum", this.a + this.b);
}
를 한다.

보통은 emit("동적인_새로운_key값", 전달할 document)와 같이 될 것이다.
예를 들어 emit(strKeyName, {"some":a, "other":b, ...}
와 같은 형태일 것이다. 여러번 emit할 수 있다.
위 예는 하나의 key로 mapping 한다.

reduce 함수를,
function(key,emits)
{
 tot = 0;
 for (i in emits) {
  tot += emits[i];
 }
 return tot;
}
와 같이 한다.
위 예는, map에서 key는 "sum" 하나이므로, 한번만 호출된다.
emits[i]를 통해, map에서 emit(key,value)한 value를 구할 수 있다.
단일값으로 변환(위 예는 合)하여 단일 값을 리턴한다.
그러면, "key":documents -> "key":value로 변환되어 리턴된다.
위 정의한 function으로 MapReduce하면 10이 리턴된다.

여하튼, 이 과정이 map reduce 이다.

일반적인 map/reduce는 다음과 같다.

위 예(10이 리턴된것)는 다음과 같다.

mongodb와 MapReduce

mapReduce에는 map, reduce가 필수이다. (물론, out도 필수가 됨)
그 이외의 옵션은 다음과 같다.

"finalize":function
 ; reduce output으로 마지막 step을 실행

"keeptemp":boolean
 ; connection 종료시 유지시킬 것이냐?

"output":string
 ; output collection 지정. keeptemp:true 한다.

"query":document
 ; map function 실행전에 호출될 query

"limit":integer
 ; map function에 전달될 최대 개수

"scope":document
 ; JavsScript code에서 사용될 변수

"verbose":boolean
 ; server log에 장황한 output을 남김

* finalize
 ; group 보다 mapReduce 사용이 큰 결과에 대해 덜 치명적이다.
   finalize 사용으로 결과 크기를 좀더 줄일 수 있다.

* keeptemp
 ; 기본값으로 MapReduce는 임시 collection을 사용한다.

* 부분 document에 대한 MapReduce
 ; collection의 일부만 MapReduce하려면 map 이전의 query filter를
   사용하면 된다.
   모든 map function은 BSON -> JavaScript object로 되기 때문에, 비용이 크다.
   그래서 document의 일부만 filter 하여 전달하여 spped up 할 수 있다.
   이를 위한 것은 query,limit,sort를 사용하는 것이다.

   예를들어,
   db.runCommand({"mapreduce":"a", "map":map, "reduce":reduce, "query":{"date":{"$gt":week_ago}}})
   와 같다.

   sort는 limit와 결합하여 사용하면 좋다.
   만약, 마지막 10,000 page view에 대해 MapReduce를 한다면 다음과 같다.
   db.runCommand({"mapReduce":"a", "map":map, "reduce":reduce, "limit":10000, "sort":{"date":-1}})

* scope 사용
 ; MapRduce는 code상 scope가 무시된다.
   MapReduce 사용시 client-side의 값이 필요하다면, scope를 사용할 수 있다.

'Research > mongodb' 카테고리의 다른 글

[mongodb] Administration  (0) 2011.09.23
[mongodb] Advanced topics  (0) 2011.09.23
[mongodb] Indexing  (0) 2011.09.22
[mongodb] Querying  (0) 2011.09.19
[mongodb] Creating, Updating, and Deleting Documents  (0) 2011.09.15