class Riak::Multi

Coordinates a parallel operation for multiple keys.

Attributes

client[R]

@return [Riak::Client] the associated client

keys[R]

@return [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys

result_hash[RW]

@return [Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances

thread_count[RW]

@return [Integer] The number of threads to use

Public Class Methods

new(client, keys) click to toggle source

Create a Riak Multi operation. @param [Client] client the {Riak::Client} that will perform the multiget @param [Array<Bucket, String>] keys an {Array} of {Bucket} and {String} keys to work on @raise [ArgumentError] when a non-positive-Integer count is given for threads

# File lib/riak/multi.rb, line 35
def initialize(client, keys)
  raise ArgumentError, t('client_type', :client => client.inspect) unless client.is_a? Riak::Client
  raise ArgumentError, t('array_type', :array => keys.inspect) unless keys.is_a? Array

  self.thread_count = client.multi_threads
  validate_keys keys
  @client = client
  @keys = keys.uniq
  self.result_hash = {}
  @finished = false
end
perform(client, keys) click to toggle source

Perform a Riak Multi operation. @param [Client] client the {Riak::Client} that will perform the operation @param [Array<Bucket, String>] keys an {Array} of {Bucket} and {String} keys to work with @return [Hash<key, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances

# File lib/riak/multi.rb, line 25
def self.perform(client, keys)
  multi = new client, keys
  multi.perform
  multi.results
end

Public Instance Methods

finished()
Alias for: finished?
finished?() click to toggle source
# File lib/riak/multi.rb, line 76
def finished?
  @finished ||= @threads && @threads.none?(&:alive?)
end
Also aliased as: finished
perform() click to toggle source

Starts the parallelized operation

# File lib/riak/multi.rb, line 48
def perform
  queue = keys.dup
  queue_mutex = Mutex.new
  result_mutex = Mutex.new

  @threads = 1.upto(thread_count).map do |_node|
    Thread.new do
      loop do
        pair = queue_mutex.synchronize do
          queue.shift
        end

        break if pair.nil?

        found = work(*pair)
        result_mutex.synchronize do
          result_hash[pair] = found
        end
      end
    end
  end
end
results() click to toggle source
# File lib/riak/multi.rb, line 71
def results
  wait_for_finish
  result_hash
end
wait_for_finish() click to toggle source
# File lib/riak/multi.rb, line 81
def wait_for_finish
  return if finished?
  @threads.each(&:join)
  @finished = true
end

Private Instance Methods

validate_keys(keys) click to toggle source
# File lib/riak/multi.rb, line 93
def validate_keys(keys)
  erroneous = keys.detect do |bucket, key|
    !bucket.is_a?(Bucket) || !key.is_a?(String)
  end
  return unless erroneous
  raise ArgumentError, t('fetch_list_type', problem: erroneous) # TODO: should be keys_type
end
work(_bucket, _key) click to toggle source
# File lib/riak/multi.rb, line 89
def work(_bucket, _key)
  raise NotImplementedError
end