class Riak::MapReduce

Class for invoking map-reduce jobs using the HTTP interface.

Attributes

inputs[RW]

@return [Array<>,String,Hash<:bucket,:filters>] The

bucket/keys for input to the job, or the bucket (all
keys), or a hash containing the bucket and key-filters.

@see add

query[RW]

@return [Array<Phase>] The map and reduce phases that will be executed @see map @see reduce @see link

Public Class Methods

new(client) { |self| ... } click to toggle source

Creates a new map-reduce job. @param [Client] client the Riak::Client interface @yield [self] helpful for initializing the job

# File lib/riak/map_reduce.rb, line 36
def initialize(client)
  @client, @inputs, @query = client, [], []
  yield self if block_given?
end

Public Instance Methods

<<(*params)
Alias for: add
add(*params) click to toggle source

Add or replace inputs for the job. @overload add(bucket)

Run the job across all keys in the bucket.  This will replace any other inputs previously added.
@param [String, Bucket] bucket the bucket to run the job on

@overload add(bucket,key)

Add a bucket/key pair to the job.
@param [String,Bucket] bucket the bucket of the object
@param [String] key the key of the object

@overload add(object)

Add an object to the job (by its bucket/key)
@param [RObject] object the object to add to the inputs

@overload add(bucket, key, keydata)

@param [String,Bucket] bucket the bucket of the object
@param [String] key the key of the object
@param [String] keydata extra data to pass along with the object to the job

@overload add(bucket, filters)

Run the job across all keys in the bucket, with the given
key-filters. This will replace any other inputs previously
added. (Requires Riak 0.14)
@param [String,Bucket] bucket the bucket to filter keys from
@param [Array<Array>] filters a list of key-filters to apply
                              to the key list

@return [MapReduce] self

# File lib/riak/map_reduce.rb, line 64
def add(*params)
  params = params.dup
  params = params.first if Array === params.first
  case params.size
  when 1
    p = params.first
    case p
    when Bucket
      @inputs = bucket_input(p)
    when RObject
      @inputs << robject_input(p)
    when String
      warn(t('full_bucket_mapred', :backtrace => caller.join("\n    "))) unless Riak.disable_list_keys_warnings
      @inputs = maybe_escape(p)
    end
  when 2..3
    bucket = params.shift

    if Array === params.first
      if bucket.is_a? Bucket
        bucket = bucket_input(bucket)
      else
        bucket = maybe_escape(bucket)
      end

      warn(t('full_bucket_mapred', :backtrace => caller.join("\n    "))) unless Riak.disable_list_keys_warnings
      @inputs = {:bucket => bucket, :key_filters => params.first }
    else
      key = params.shift
      key_data = params.shift || ''
      @inputs << key_input(key, bucket, key_data)
    end
  end
  self
end
Also aliased as: <<, include
filter(bucket, &block) click to toggle source

Adds a bucket and key-filters built by the given block. Equivalent to add with a list of filters. @param [String] bucket the bucket to apply key-filters to @yield [] builder block - instance_eval'ed into a FilterBuilder @return [MapReduce] self @see MapReduce#add

# File lib/riak/map_reduce.rb, line 108
def filter(bucket, &block)
  add(bucket, FilterBuilder.new(&block).to_a)
end
include(*params)
Alias for: add
index(bucket, index, query) click to toggle source

(Secondary Indexes) Use a secondary index query to start a map/reduce job. @param [String, Bucket] bucket the bucket whose index to query @param [String] index the index to query @param [String, Integer, Range] query the value of the index, or

a range of values (of Strings or Integers)

@return [MapReduce] self

# File lib/riak/map_reduce.rb, line 130
def index(bucket, index, query)
  if bucket.is_a? Bucket
    bucket = bucket.needs_type? ? [maybe_escape(bucket.type.name), maybe_escape(bucket.name)] : maybe_escape(bucket.name)
  else
    bucket = maybe_escape(bucket)
  end

  case query
  when String, Fixnum
    @inputs = {:bucket => bucket, :index => index, :key => query}
  when Range
    raise ArgumentError, t('invalid_index_query', :value => query.inspect) unless String === query.begin || Integer === query.begin
    @inputs = {:bucket => bucket, :index => index, :start => query.begin, :end => query.end}
  else
    raise ArgumentError, t('invalid_index_query', :value => query.inspect)
  end
  self
end
map(*params) click to toggle source

Add a map phase to the job. @overload map(function)

@param [String, Array] function a Javascript function that represents the phase, or an Erlang [module,function] pair

@overload map(function?, options)

@param [String, Array] function a Javascript function that represents the phase, or an Erlang [module, function] pair
@param [Hash] options extra options for the phase (see {Phase#initialize})

@return [MapReduce] self @see Phase#initialize

# File lib/riak/map_reduce.rb, line 157
def map(*params)
  options = params.extract_options!
  @query << Phase.new({:type => :map, :function => params.shift}.merge(options))
  self
end
reduce(*params) click to toggle source

Add a reduce phase to the job. @overload reduce(function)

@param [String, Array] function a Javascript function that represents the phase, or an Erlang [module,function] pair

@overload reduce(function?, options)

@param [String, Array] function a Javascript function that represents the phase, or an Erlang [module, function] pair
@param [Hash] options extra options for the phase (see {Phase#initialize})

@return [MapReduce] self @see Phase#initialize

# File lib/riak/map_reduce.rb, line 171
def reduce(*params)
  options = params.extract_options!
  @query << Phase.new({:type => :reduce, :function => params.shift}.merge(options))
  self
end
run(&block) click to toggle source

Executes this map-reduce job. @overload run

Return the entire collection of results.
@return [Array<Array>] similar to link-walking, each element is
  an array of results from a phase where "keep" is true. If there
  is only one "keep" phase, only the results from that phase will
  be returned.

@overload run

Stream the results through the given block without accumulating.
@yield [phase, data] A block to stream results through
@yieldparam [Fixnum] phase the phase from which the results were
       generated
@yieldparam [Array] data a list of results from the phase
@return [nil] nothing
# File lib/riak/map_reduce.rb, line 229
def run(&block)
  @client.mapred(self, &block)
rescue FailedRequest => fr
  if fr.server_error? && fr.is_json?
    raise MapReduceError.new(fr.body)
  else
    raise fr
  end
end
timeout(value) click to toggle source

Sets the timeout for the map-reduce job. @param [Fixnum] value the job timeout, in milliseconds

# File lib/riak/map_reduce.rb, line 201
def timeout(value)
  @timeout = value
  return self
end
Also aliased as: timeout=
timeout=(value)
Alias for: timeout
to_json(*a) click to toggle source

Convert the job to JSON for submission over the HTTP interface. @return [String] the JSON representation

# File lib/riak/map_reduce.rb, line 209
def to_json(*a)
  hash = {"inputs" => inputs, "query" => query.map(&:as_json)}
  hash['timeout'] = @timeout.to_i if @timeout
  hash.to_json(*a)
end

Private Instance Methods

bucket_input(bucket) click to toggle source

Processes a {Bucket} or {BucketTyped::Bucket} into a whole-bucket {MapReduce} input.

# File lib/riak/map_reduce.rb, line 243
def bucket_input(bucket)
  warn(t('full_bucket_mapred', :backtrace => caller.join("\n    "))) unless Riak.disable_list_keys_warnings

  if bucket.needs_type?
    return [maybe_escape(bucket.type.name), maybe_escape(bucket.name)]
  end

  maybe_escape(bucket.name)
end
key_input(key, bucket, key_data = '') click to toggle source

Processes a key into a single-object {MapReduce} input, doing the correct thing if the bucket argument is a {String}, {Bucket}, or a {BucketTyped::Bucket}.

# File lib/riak/map_reduce.rb, line 272
def key_input(key, bucket, key_data = '')
  kd = []
  kd << key_data unless key_data.blank?

  if bucket.is_a? String
    return [
            maybe_escape(bucket),
            maybe_escape(key)
           ] + kd
  elsif bucket.needs_type?
    return [
            maybe_escape(bucket.name),
            maybe_escape(key),
            key_data,
            maybe_escape(bucket.type.name)
           ]
  else
    return [
            maybe_escape(bucket.name),
            maybe_escape(key)
           ] + kd
  end
end
robject_input(obj, key_data = '') click to toggle source

Processes a {RObject} into a single-object {MapReduce} input, whether it has a bucket type or not.

# File lib/riak/map_reduce.rb, line 255
def robject_input(obj, key_data = '')
  bucket = obj.bucket
  if bucket.needs_type?
    return [
            maybe_escape(bucket.name),
            maybe_escape(obj.key),
            key_data,
            maybe_escape(bucket.type.name)
           ]
  end

  [maybe_escape(obj.bucket.name), maybe_escape(obj.key)]
end