-
Notifications
You must be signed in to change notification settings - Fork 35
MapReduce
######################################################################## WARNING: Map/Reduce is NOT intended as a general query mechanism for Riak. It will increase load on your cluster and is best suited for batch processing of data during off peak hours. ########################################################################
Key/Value stores are designed for key-based lookups; ad hoc queries are typically not well supported. The Map/Reduce model is a feature of Riak that makes it possible to perform diverse operations on data, including querying, filtering, and aggregating information.
Full documentation of the Map/Reduce features available in Riak can be found at the Riak Map/Reduce documentation. This article covers how to use these features in the Riak .NET Client, but doesn't cover the features themselves.
A class is provided which that is the first point of call for writing Map/Reduce queries. That class is named RiakMapReduceQuery
and has a fluent-style interface to facilitate creating ad hoc queires.
You can begin by creating a new RiakMapReduceQuery
object:
var query = new RiakMapReduceQuery();
After creating an instance, you'll want to specify the inputs for the Map/Reduce job. Inputs can be specified in three different ways - as a Bucket, as a list of Bucket/Key pairs, as a Bucket/Key/Argument triple.
- Bucket Name - A single string value that indicates the job should operate on all the keys within that bucket.
- Bucket/Key Pairs - A list of RiakBucketKeyInput instances which represent the Bucket/Key pairs which the job should operate on.
Here is how you would specify a single bucket input:
var query = new RiakMapReduceQuery().Inputs("BucketName");
To specify Bucket/Key pairs, do the following:
var inputs = new RiakBucketKeyInput()
.Add("bucket1", "key1")
.Add("bucket2", "key2")
.Add("bucket3", "key3");
var query = new RiakMapReduceQuery()
.Inputs(inputs);
To specify Bucket/Key/Arg triples, do the following:
var inputs = new RiakBucketKeyInput()
.Add("bucket1", "key1", 3.1415)
.Add("bucket2", "key2", "slartibartfast")
.Add("bucket3", "key3", new { foo = "bar", baz = 10 });
var query = new RiakMapReduceQuery()
.Inputs(inputs);
Now that the inputs have been specified, the next step is to write a series of Map and Reduce phases.
Riak supports several types of phases to be run during a Map/Reduce job; each type of phase can be specified more than once. Each phase type is matched with one or more methods in the RiakMapReduceQuery
class. Each one of these methods takes a single parameter - an Action<T>
. The type T
for each action will vary depending on the phase and language that the phase is executed with. The instance of T
being passed has all the methods and properties available for the phase and language type. The action passed in as a parameter operates on the instance of T
to configure that phase.
The phase methods correspond to the languages that Riak supports for Map/Reduce operations: MapJs()
and MapErlang()
.
The MapJs
method creates a Map phase using Javascript to describe the mapping. The action parameter for this function call is of type RiakFluentActionPhaseJavascript
and can also be configured in a fluent style. This class contains the following methods:
-
Keep(bool)
- tells the client that the results of this phase should be kept or discarded. -
Argument<T>(T)
- specifies the argument to pass in to the phase for each bucket/key pair that is processed during the phase. -
Name(string)
- specifies the name of a built-in Javascript Map function to use for this phase (such as"Riak.mapValuesJson"
). -
Source(string)
- specifies the full source, in Javascript, of the function to execute for this phase. -
BucketKey(string, string)
- specifies the Bucket name and Key name which indicates the location, inside Riak, of the object which contains the Javascript source function to use during this phase.
Name()
, Source()
, and BucketKey()
are mutually exclusive. Only one of these values should be set. If more than one of these is set an exception will occur when the Map/Reduce query is executed.
Both the Keep()
and Argument()
can be set at any time along with any of the other functions. They are not mutually exclusive.
Here are a few examples:
var query1 = new RiakMapReduceQuery()
.Inputs("BucketName")
.MapJs(m => m.Name("Riak.mapValuesJson").Keep(true));
var query2 = new RiakMapReduceQuery()
.Inputs("SomeBucket")
.MapJs(m => m.BucketKey("mapred_scripts", "do_magic").Argument("foo"));
MapErlang()
is a Map phase using Erlang instead of Javascript as the source language. The big difference with MapErlang()
is that you can't write Erlang source code and pass that in as a parameter like you can with MapJs()
. To put it another way: there are no ad hoc Erlang functions in Riak Map/Reduce.
The action parameter for this function call is of type RiakFluentActionPhaseErlang
and can be configured in a fluent style. This class contains the following methods:
-
Keep(bool)
- tells the client that the results of this phase should be kept or discarded. -
Argument<T>(T)
- specifies the argument to pass in to the phase for each bucket/key pair that is processed during the phase. -
ModFun(string, string)
- Erlang functions are located inside a module. This function indicate which function to execute and the module that it can be found in.
Here's an example:
var query = new RiakMapReduceQuery()
.Inputs("BucketName")
.MapErlang(m => m.ModFun("my_module", "the_function").Keep(true));
Both ReduceJs()
and ReduceErlang()
have interfaces that match their Map counterparts. The functions they expose are also the same.
Here's an example:
var query = new RiakMapReduceQuery()
.Inputs("BucketName")
.MapJs(m => m.Source(@"function(o){return[1];}"))
.ReduceJs(m => m.Name(@"Riak.reduceSum").Keep(true));
Link()
gives callers the ability to access linked items via Riak's link capability. Link()
expects an action which takes a RiakFluentLinkPhase
. Without digging too deep, links point to a record in another bucket and can be identified by a separate tag. This class has the following methods:
-
Keep(bool)
- tells the client that the results of this phase should be kept or discarded. -
Bucket(string)
- the bucket to examine for links. -
Tag(string)
- if supplied, all links returned will have this tag. -
AllLinks()
- indicates that the user is interested in all of the links that the object has. -
FromRiakLink(RiakLink)
- helper function which translates a RiakLink instance into a link search definition.
Here are a few examples;
// get all friends in the "people" bucket (avoids pets and programmers)
var query1 = new RiakMapReduceQuery()
.Inputs("people")
.Link(l => l.Tag("friend").Bucket("people"))
.ReduceErlang(r => r.ModFun("riak_kv_mapreduce", "reduce_set_union").Keep(true));
// get every link available for each person
var query2 = new RiakMapReduceQuery()
.Inputs("people")
.Link(l => l.AllLinks())
.ReduceErlang(r => r.ModFun("riak_kv_mapreduce", "reduce_set_union").Keep(true));
// get every language OJ doesn't like
var query3 = new RiakMapReduceQuery()
.Inputs(new []{new RiakBucketKeyInput("people", "oj")})
.Link(l => l.Tag("dislike").Bucket("languages"))
.ReduceErlang(r => r.ModFun("riak_kv_mapreduce", "reduce_set_union").Keep(true));
The result set of the last query may include PHP.
Riak provides the ability to rapidly filter keys by using a set of search predicates using key filters.
Filter()
functions differently to how the other phases work due to the potential complexity of the filter. The function takes an instance of a Key Filter, of arbitrary complexity, and adds that to the set of key filters which will eventually be included in the query.
Here's an example:
var query = new RiakMapReduceQuery()
.Inputs("people")
.Filter(new Matches("jeremiah"))
.Link(l => l.Tag("friends").Bucket("people"))
.ReduceErlang(r => r.ModFun("riak_kv_mapreduce", "reduce_set_union").Keep(true));
For more in-depth information about Key Filters and the types that are available through this interface, take a look at our [Key Filter documentation][KeyFilterDocs] page.
Note: Key filters were made available in v0.14. Do not use the key filter functionality if your Riak cluster has an earlier version.
The order that the phase functions are executed on the RiakMapReduceQuery
class indicate the order in which those phases will be executed inside Riak. Make sure you get them right!
Pass the query instance into one of the RiakClient
Map/Reduce interface functions. The options are:
-
RiakClient.MapReduce()
- This is the blocking, non-streaming version of the interface. If you know that your query isn't going to take long, and that you're interested in dealing with the results on the current thread then this is the function for you. Bear in mind that all of the results are fetched before the function returns. -
RiakClient.StreamMapReduce()
- Use this when you know that you're going to be dealing with a large result set (probably the most common use case). The result set will be pulled out of Riak as you're processing the results, which means the memory required to parse the result set is substantially smaller than a straightMapReduce()
call. -
RiakClient.Async.MapReduce()
- This is the asynchronous counterpart toRiak.MapReduce()
, it functions exactly the same way, the only difference is that the result set is returned to the specified callback on the a different thread. -
RiakClient.Async.StreamMapReduce()
- This is the asynchronous counterpart toRiak.StreamMapReduce()
, it functions exactly the same way, the only difference is that the result set is returned to the specified callback on the a different thread.
Here are some examples:
var client = cluster.GetClient();
.
.
var query = new RiakMapReduceQuery()/* setup your query */;
// blocking, non-streaming
var result = client.MapReduce(query);
// blocking, streaming
var streamedResult = client.StreamMapReduce(query);
// async, non-streaming
client.Async.MapReduce(query, resultHandler);
// async, streaming
client.Async.StreamMapReduce(query, streamedResultHandler);
Regardless of the approach that you take when executing the query, you'll get an object back which contains the results of the phases that were executed by the Map/Reduce job. If you chose to use a non-streaming API call, the result you get back will include the phase results in the order that they were specified.
Order cannot be guaranteed with streaming API calls. It is up to the caller to guarantee that they are working with the correct result phase when parsing Map/Reduce results.
Parse the results by enumerating them and extracting values:
var result = client.MapReduce(query);
if (result.IsSuccess)
{
foreach (var phase in result.Value.PhaseResults)
{
// this contains the phase index
var phaseNume = phase.Phase;
// get access to the value in various ways
var sumResult = phase.GetObject<int[]>();
var objResult = phase.GetObject<CustomType>();
var stringResult = phase.Value.FromRiakString();
}
}
General
Usage
- Installing the Riak .NET Client
- Connecting to Riak
- Taste of Riak
- Querying:
- Other Query Types:
- API Reference