The map/reduce programming model
Riak provides a data processing implementation based on the MapReduce model popularized by Google and since adapted by Hadoop as well as others. Like every known map/reduce implementation, ours is shaped to best take advantage of our own approach to distributed data organization.
bring the computation to the data
One of the main reasons to use a map/reduce style of programming is to exploit data locality. For data processing in a networked environment, it is generally understood that making the computation run where the data is already located will often perform much better (and be much more practical to manage) than moving all the data to the systems that will perform the computation. Since Riak is designed around algorithms to manage and find data efficiently, the shape of our map/reduce implementation follows naturally from the rest of the system.
A map/reduce query or "flow" is simply a sequence of map and reduce phases, each feeding the next, and together providing an aggregate result.
A "map phase" is essentially just a function ("F") and an argument ("A") that is defined as part of a series of phases making up a given map/reduce query. The phase will receive a stream of inputs ("I"), each of which consists of a key identifying a Riak object and an optional additional data element to accompany that object. As each input is received by the phase, a node that already contains the document ("D") corresponding to "I" will run F(D,A) and stream along the results to the next phase. The point here is that your function can be executed over many data items, but instead of collecting all of the data items in one place it will execute wherever the data is already placed.
A "reduce phase" is conceptually simpler. As it receives inputs from the preceding phase, it collates those inputs along with the ones already received and continually "reduces" the input set until it receives notification that it will receive no further data, at which point the entire reduced set will be streamed to the next phase. Note that this makes a reduce phase a concurrency barrier, as opposed to map phases which can be processing in parallel. In order for this process to make any sense, a reduce phase's function must be commutative, associative, and idempotent. Good examples are sum and set-union. As Riak's core focus is on decentralized data storage and not on compute farming, reduce phases are generally run on a single node -- there is no data-locality gain to be had in reduce.
A perfect fit for the Web
Since the original published use of map/reduce was for processing Web search indices, it should come as no surprise that this model is a great fit for the general problem of processing linked data. That is, if your data set consists of many mostly-independent documents, loosely coupled by way of links inside the documents, then map/reduce is very likely to be a good approach to querying that data collection. To make this even easier, we have added a superficial third type of phase to the model, the "link" phase. In fact, link phases are just map phases, parameterized ahead of time such that the map function will be a function that knows enough about your document types to extract links matching a given pattern or tag. While such map phases can of course be written manually, we saw this operation so frequently that we made a shorthand for it -- and now those phases are by far the most common use of our map/reduce engine.
not just for bulk processing
While the most popular map/reduce systems out there are generally used to apply a map/reduce flow over an entire body of data, Riak provides a more focused approach. A query is instead "seeded", or provided with the explicit list of inputs that will be used by the first phase in the query. This approach, combined with the link-following convention, allows for an entirely new set of uses for the map/reduce programming paradigm.
the gory details
A map/reduce query is initiated with two arguments. The first is simply the list of values (usually bucket/key pairs as the first phase is almost always a map phase) that will be sent to the first phase in the flow. The second argument is a list of terms declaring the flow of phases for this query.
A map phase is declared as:
{map, FunTerm, Arg, Accumulate}
FunTerm is a reference to the function that will compute the map of
each value. A function referenced by a FunTerm must be arity-3,
accepting the arguments (Value, Data, Arg) as follows:
Value is the value found at a key. This will either be a Riak object structure (accessed via the riak_object module) or else the tuple {error, notfound}.
Data is an optional piece of data attached to the bucket/key pair that initiated this execution. If instead of {Bucket, Key}, {{Bucket, Key}, Data} is passed as an input to a map step, that Data will be passed to the map function in this argument. Data will be the atom 'undefined' if the former form is used.
Arg is the argument by the same name that was passed to the overall map phase declaration.
The FunTerm may take one of two forms: Either {modfun, Module, Function} where Module and Function are atoms that name an Erlang function in a specific module, or {qfun, Function} where Function is a callable fun term.
Accumulate should be set to true for all phases whose output is desired in the final result of the map/reduce execution, and false for all others. The most common pattern is to set this to true in only the very last phase, but some interesting queries can be produced by setting it earlier as well.
Note that a map function must return a list of values, each of which will be an input to the next phase.
A reduce phase is declared as:
{reduce, FunTerm, Arg, Accumulate}
Where the terms are essentially the same as for map, with the exception that the function referenced by FunTerm must be arity 2. It takes (ValueList, Arg) with Arg playing the same role as before and ValueList being a round of (possibly already processed) inputs to reduce.
Much like with map, a reduce function must return a list of values. This list will be combined with the next input list the next time the reduce function is called, which is why the reduce function must be commutative, associative, and idempotent.
The third and final type of phase is a link phase, declared as {link, Bucket, Tag, Accumulate}. For this kind of phase to work, there must already be a linkfun property set on Bucket in the cluster, which must return a FunTerm. This will be translated into a map phase with that term, looking like: {map, FunTerm, {Bucket,Tag}, Accumulate}.
