Introduction to Map/Reduce on Riak ------ This document describes Riak's implementation of a data processing system based on the MapReduce[1] programming paradigm popularized by Google. It assumes that you have already set up Riak, and know the basics about dealing with Riak clients. For more information on these prerequisites, see riak/doc/basic-setup.txt and riak/doc/basic-client.txt. Quick and Dirty Example --- If you have a Riak client hanging around, you can execute Map/Reduce queries on it like this: 1> Count = fun(G, undefined, none) -> [dict:from_list([{I, 1} || I <- riak_object:get_value(G)])] end. 2> Merge = fun(Gcounts, none) -> [lists:foldl(fun(G, Acc) -> dict:merge(fun(_, X, Y) -> X+Y end, G, Acc) end, dict:new(), Gcounts)] end. 3> {ok, [R]} = Client:mapred([{<<"groceries">>, <<"mine">>}, {<<"groceries">>, <<"yours">>}], [{map, {qfun, Count}, none, false}, {reduce, {qfun, Merge}, none, true}]). 4> L = dict:to_list(R). If the "mine" and "yours" objects in the groceries bucket had values of ["bread", "cheese"], ["bread", "butter"], the sequence of commands above would result in L being bound to [{"bread", 2},{"cheese",1},{"butter",1}]. Details --- More importantly, riak_client:mapred takes two lists as arguments. The first list contains bucket key pairs, which are the keys to the "starting values" of the Map/Reduce query. The second list are the steps of the query. Map Steps --- Map steps expect as input a list of bucket/key pairs, just like the first argument to the riak_client:mapred function. Riak executes a map step by looking up values for keys in the input list and executing the map function referenced in the step. Map steps take the form: {map, FunTerm, Arg, Accumulate} Where: FunTerm is a reference to the function that will compute the map of each value. A function referenced by a FunTerm must be arity-3, accepting the arguments: Value: the value found at a key. This will be a Riak object (defined by the riak_object module) if a value was found, or the tuple {error, notfound} if a bucket/key was put in the input list, but not found in the Riak cluster. Data: An optional piece of data attached to the bucket/key tuple. If instead of {Bucket, Key}, {{Bucket, Key}, Data} is passed as input to a map step, that Data will be passed to the map function in this argument. Data will be the atom 'undefined' if the former form is used. Arg: The Arg from the map step definition. The same Arg is passed to every execution of the map function in this step. Functions may be referenced in two ways: {modfun, Module, Function} where Module and Function are atoms that name an Erlang function in a specific module {qfun, Function} where Function is a callable fun term The function must return a *list* of values. The lists returned by all executions of the map function for this step will be appended and passed to the next step. Arg: The third argument passed to the function referenced in FunTerm. Accumulate: If true, the output of this map step will be included in the final return of the mapred function. If false, the output will be discarded after the next step. Reduce Steps --- A reduce step takes the form: {reduce, FunTerm, Arg, Acc} Where FunTerm, Arg, and Acc are mostly the same as their definition for a map step, but the function referenced in FunTerm is instead of arity 2. Its parameters are: ValueList: The list of values produce by the preceeding step of the Map/Reduce. Arg: The Arg from the step definition. The function should again produce a list of values, but it must also be true that the function is commutative, associative, and idempotent. That is, if the input list [a,b,c,d] is valid for a given F, then all of the following must produce the same result: F([a,b,c,d]) F([a,d] ++ F([c,b])) F([F([a]),F([c]),F([b]),F([d])]) Where does the code run? --- So, all well and good, but you could code the same abstraction in a couple of hours, right? Just fetch each object and run your function. Well, not so fast. This map/reduce isn't just an abstraction, it fully exploits data locality. That is to say, both map and reduce functions run on Riak nodes. Map nodes are even run on the node where the data is already located. This means a few things to you: - If you use the {modfun, Module, Function} form of the FunTerm in the map/reduce step definition, that Module must be in the code path of the Riak node. This isn't a huge concern for libraries that ship with Erlang, but for any of your custom code, you'll need to make sure it's loadable. - If you use the {modfun, Module, Function} form of the FunTerm in the map/reduce step definition, you'll need to force the Riak nodes to reload the Module if you make a change to it. The easiest way to reload a module on a Riak node is to get a Riak client, then call Client:reload_all(Module). - If you need to do a Riak 'get' inside of a map or reduce function, you can use riak:local_client/0 to get a Riak client instead of riak:client_connect/1. - Your map and reduce functions are running on a Riak node, which means that that Riak node is spending CPU time doing something other than responding to 'get' and 'put' requests. - If you use the {qfun, Fun} form, your callable function, and its environment will be shipped to the Riak cluster, and each node on which it runs. This is both a benefit (in that you have the full power of closures) and a danger (in that you must be mindful of closing over very large data structures). [1] http://labs.google.com/papers/mapreduce.html