class Riak::Multi
Coordinates a parallel operation for multiple keys.
Attributes
@return [Riak::Client] the associated client
@return [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys
@return [Hash<fetch_list_entry, RObject] result_hash
a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
@return [Integer] The number of threads to use
Public Class Methods
Create a Riak
Multi
operation. @param [Client] client the {Riak::Client} that will perform the multiget @param [Array<Bucket, String>] keys an {Array} of {Bucket} and {String} keys to work on @raise [ArgumentError] when a non-positive-Integer count is given for threads
# File lib/riak/multi.rb, line 35 def initialize(client, keys) raise ArgumentError, t('client_type', :client => client.inspect) unless client.is_a? Riak::Client raise ArgumentError, t('array_type', :array => keys.inspect) unless keys.is_a? Array self.thread_count = client.multi_threads validate_keys keys @client = client @keys = keys.uniq self.result_hash = {} @finished = false end
Perform a Riak
Multi
operation. @param [Client] client the {Riak::Client} that will perform the operation @param [Array<Bucket, String>] keys an {Array} of {Bucket} and {String} keys to work with @return [Hash<key, RObject] result_hash
a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
# File lib/riak/multi.rb, line 25 def self.perform(client, keys) multi = new client, keys multi.perform multi.results end
Public Instance Methods
# File lib/riak/multi.rb, line 76 def finished? @finished ||= @threads && @threads.none?(&:alive?) end
Starts the parallelized operation
# File lib/riak/multi.rb, line 48 def perform queue = keys.dup queue_mutex = Mutex.new result_mutex = Mutex.new @threads = 1.upto(thread_count).map do |_node| Thread.new do loop do pair = queue_mutex.synchronize do queue.shift end break if pair.nil? found = work(*pair) result_mutex.synchronize do result_hash[pair] = found end end end end end
# File lib/riak/multi.rb, line 71 def results wait_for_finish result_hash end
# File lib/riak/multi.rb, line 81 def wait_for_finish return if finished? @threads.each(&:join) @finished = true end
Private Instance Methods
# File lib/riak/multi.rb, line 93 def validate_keys(keys) erroneous = keys.detect do |bucket, key| !bucket.is_a?(Bucket) || !key.is_a?(String) end return unless erroneous raise ArgumentError, t('fetch_list_type', problem: erroneous) # TODO: should be keys_type end
# File lib/riak/multi.rb, line 89 def work(_bucket, _key) raise NotImplementedError end