class Riak::MapReduce
Class for invoking map-reduce jobs using the HTTP interface.
Attributes
Public Class Methods
Creates a new map-reduce job. @param [Client] client the Riak::Client
interface @yield [self] helpful for initializing the job
# File lib/riak/map_reduce.rb, line 36 def initialize(client) @client, @inputs, @query = client, [], [] yield self if block_given? end
Public Instance Methods
Add or replace inputs for the job. @overload add(bucket)
Run the job across all keys in the bucket. This will replace any other inputs previously added. @param [String, Bucket] bucket the bucket to run the job on
@overload add(bucket,key)
Add a bucket/key pair to the job. @param [String,Bucket] bucket the bucket of the object @param [String] key the key of the object
@overload add(object)
Add an object to the job (by its bucket/key) @param [RObject] object the object to add to the inputs
@overload add(bucket, key, keydata)
@param [String,Bucket] bucket the bucket of the object @param [String] key the key of the object @param [String] keydata extra data to pass along with the object to the job
@overload add(bucket, filters)
Run the job across all keys in the bucket, with the given key-filters. This will replace any other inputs previously added. (Requires Riak 0.14) @param [String,Bucket] bucket the bucket to filter keys from @param [Array<Array>] filters a list of key-filters to apply to the key list
@return [MapReduce] self
# File lib/riak/map_reduce.rb, line 64 def add(*params) params = params.dup params = params.first if Array === params.first case params.size when 1 p = params.first case p when Bucket @inputs = bucket_input(p) when RObject @inputs << robject_input(p) when String warn(t('full_bucket_mapred', :backtrace => caller.join("\n "))) unless Riak.disable_list_keys_warnings @inputs = maybe_escape(p) end when 2..3 bucket = params.shift if Array === params.first if bucket.is_a? Bucket bucket = bucket_input(bucket) else bucket = maybe_escape(bucket) end warn(t('full_bucket_mapred', :backtrace => caller.join("\n "))) unless Riak.disable_list_keys_warnings @inputs = {:bucket => bucket, :key_filters => params.first } else key = params.shift key_data = params.shift || '' @inputs << key_input(key, bucket, key_data) end end self end
Adds a bucket and key-filters built by the given block. Equivalent to add
with a list of filters. @param [String] bucket the bucket to apply key-filters to @yield [] builder block - instance_eval'ed into a FilterBuilder
@return [MapReduce] self @see MapReduce#add
# File lib/riak/map_reduce.rb, line 108 def filter(bucket, &block) add(bucket, FilterBuilder.new(&block).to_a) end
(Secondary Indexes) Use a secondary index query to start a map/reduce job. @param [String, Bucket] bucket the bucket whose index to query @param [String] index the index to query @param [String, Integer, Range] query the value of the index, or
a range of values (of Strings or Integers)
@return [MapReduce] self
# File lib/riak/map_reduce.rb, line 130 def index(bucket, index, query) if bucket.is_a? Bucket bucket = bucket.needs_type? ? [maybe_escape(bucket.type.name), maybe_escape(bucket.name)] : maybe_escape(bucket.name) else bucket = maybe_escape(bucket) end case query when String, Fixnum @inputs = {:bucket => bucket, :index => index, :key => query} when Range raise ArgumentError, t('invalid_index_query', :value => query.inspect) unless String === query.begin || Integer === query.begin @inputs = {:bucket => bucket, :index => index, :start => query.begin, :end => query.end} else raise ArgumentError, t('invalid_index_query', :value => query.inspect) end self end
Add a link phase to the job. Link
phases follow links attached to objects automatically (a special case of map). @overload link(walk_spec, options={})
@param [WalkSpec] walk_spec a WalkSpec that represents the types of links to follow @param [Hash] options extra options for the phase (see {Phase#initialize})
@overload link(bucket, tag, keep, options={})
@param [String, nil] bucket the bucket to limit links to @param [String, nil] tag the tag to limit links to @param [Boolean] keep whether to keep results of this phase (overrides the phase options) @param [Hash] options extra options for the phase (see {Phase#initialize})
@overload link(options)
@param [Hash] options options for both the walk spec and link phase @see WalkSpec#initialize
@return [MapReduce] self @see Phase#initialize
# File lib/riak/map_reduce.rb, line 191 def link(*params) options = params.extract_options! walk_spec_options = options.slice!(:type, :function, :language, :arg) unless params.first walk_spec = WalkSpec.normalize(params.shift || walk_spec_options).first @query << Phase.new({:type => :link, :function => walk_spec}.merge(options)) self end
Add a map phase to the job. @overload map(function)
@param [String, Array] function a Javascript function that represents the phase, or an Erlang [module,function] pair
@overload map(function?, options)
@param [String, Array] function a Javascript function that represents the phase, or an Erlang [module, function] pair @param [Hash] options extra options for the phase (see {Phase#initialize})
@return [MapReduce] self @see Phase#initialize
# File lib/riak/map_reduce.rb, line 157 def map(*params) options = params.extract_options! @query << Phase.new({:type => :map, :function => params.shift}.merge(options)) self end
Add a reduce phase to the job. @overload reduce(function)
@param [String, Array] function a Javascript function that represents the phase, or an Erlang [module,function] pair
@overload reduce(function?, options)
@param [String, Array] function a Javascript function that represents the phase, or an Erlang [module, function] pair @param [Hash] options extra options for the phase (see {Phase#initialize})
@return [MapReduce] self @see Phase#initialize
# File lib/riak/map_reduce.rb, line 171 def reduce(*params) options = params.extract_options! @query << Phase.new({:type => :reduce, :function => params.shift}.merge(options)) self end
Executes this map-reduce job. @overload run
Return the entire collection of results. @return [Array<Array>] similar to link-walking, each element is an array of results from a phase where "keep" is true. If there is only one "keep" phase, only the results from that phase will be returned.
@overload run
Stream the results through the given block without accumulating. @yield [phase, data] A block to stream results through @yieldparam [Fixnum] phase the phase from which the results were generated @yieldparam [Array] data a list of results from the phase @return [nil] nothing
# File lib/riak/map_reduce.rb, line 229 def run(&block) @client.mapred(self, &block) rescue FailedRequest => fr if fr.server_error? && fr.is_json? raise MapReduceError.new(fr.body) else raise fr end end
(Riak
Search
) Use a search query to start a map/reduce job. @param [String,Riak::Search::Index] index the index to query, either a
{Riak::Search::Index} instance or a {String}
@param [String] query the query to run @return [MapReduce] self
# File lib/riak/map_reduce.rb, line 117 def search(index, query) index = index.name if index.respond_to?(:name) @inputs = {:module => "yokozuna", :function => "mapred_search", :arg => [index, query]} self end
Sets the timeout for the map-reduce job. @param [Fixnum] value the job timeout, in milliseconds
# File lib/riak/map_reduce.rb, line 201 def timeout(value) @timeout = value return self end
Private Instance Methods
Processes a {Bucket} or {BucketTyped::Bucket} into a whole-bucket {MapReduce} input.
# File lib/riak/map_reduce.rb, line 243 def bucket_input(bucket) warn(t('full_bucket_mapred', :backtrace => caller.join("\n "))) unless Riak.disable_list_keys_warnings if bucket.needs_type? return [maybe_escape(bucket.type.name), maybe_escape(bucket.name)] end maybe_escape(bucket.name) end
Processes a key into a single-object {MapReduce} input, doing the correct thing if the bucket argument is a {String}, {Bucket}, or a {BucketTyped::Bucket}.
# File lib/riak/map_reduce.rb, line 272 def key_input(key, bucket, key_data = '') kd = [] kd << key_data unless key_data.blank? if bucket.is_a? String return [ maybe_escape(bucket), maybe_escape(key) ] + kd elsif bucket.needs_type? return [ maybe_escape(bucket.name), maybe_escape(key), key_data, maybe_escape(bucket.type.name) ] else return [ maybe_escape(bucket.name), maybe_escape(key) ] + kd end end
Processes a {RObject} into a single-object {MapReduce} input, whether it has a bucket type or not.
# File lib/riak/map_reduce.rb, line 255 def robject_input(obj, key_data = '') bucket = obj.bucket if bucket.needs_type? return [ maybe_escape(bucket.name), maybe_escape(obj.key), key_data, maybe_escape(bucket.type.name) ] end [maybe_escape(obj.bucket.name), maybe_escape(obj.key)] end