module Bramble::Reduce

Public Instance Methods

perform(handle, implementation) click to toggle source

Queue up a job to call `.reduce` on keys for `handle`

# File lib/bramble/reduce.rb, line 8
def perform(handle, implementation)
  Bramble::State.running?(handle) do
    # Set how many reduce call we expect
    all_raw_keys = storage.map_keys_get(keys_key(handle))
    storage.set(reduce_total_count_key(handle), all_raw_keys.length)
    # Enqueue a job for each reduce call
    all_raw_keys.each do |raw_key|
      Bramble::ReduceJob.perform_later(handle, implementation.name, raw_key)
    end
  end
end
perform_reduce(handle, implementation, raw_key) click to toggle source

Perform `.reduce` on `raw_key`, handling errors and saving the result

# File lib/bramble/reduce.rb, line 21
def perform_reduce(handle, implementation, raw_key)
  if Bramble::State.running?(handle)
    raw_values = storage.map_result_get(data_key(handle, raw_key))
    values = Bramble::Serialize.load(raw_values)
    key = Bramble::Serialize.load(raw_key)
    reduced_value = nil

    Bramble::ErrorHandling.rescuing(implementation) do
      # Run the defined .reduce function
      reduced_value = implementation.reduce(key, values)
      # Store the result
      Bramble::State.running?(handle) do
        storage.reduce_result_set(result_key(handle), raw_key, Bramble::Serialize.dump(reduced_value))
      end
    end

    # Mark this key as reduced, check if we're finished
    Bramble::State.running?(handle) do
      storage.increment(reduce_finished_count_key(handle))
      if Bramble::State.percent_reduced(handle) >= 1
        storage.set(finished_at_key(handle), Time.now.to_i)
      end
    end
  else
    Bramble::State.clear_reduce(handle)
  end
end
storage() click to toggle source
# File lib/bramble/reduce.rb, line 53
def storage
  Bramble::Storage
end