module Bramble::Map

Public Instance Methods

perform(handle, implementation, values) click to toggle source

For each of `values`, queue up a job to call the map function

# File lib/bramble/map.rb, line 8
def perform(handle, implementation, values)
  Bramble::State.running?(handle) do
    storage.set(map_total_count_key(handle), values.length)
    values.each do |value|
      Bramble::MapJob.perform_later(handle, implementation.name, Bramble::Serialize.dump(value))
    end
  end
end
perform_map(handle, implementation, value) click to toggle source

Call `.map` on `value`, storing the result for `.reduce` and handling any error.

# File lib/bramble/map.rb, line 18
def perform_map(handle, implementation, value)
  Bramble::State.running?(handle) do
    impl_keys_key = keys_key(handle)

    Bramble::ErrorHandling.rescuing(implementation) do
      # Execute the provided map function
      implementation.map(value) do |map_key, map_val|
        Bramble::State.running?(handle) do
          raw_key = Bramble::Serialize.dump(map_key)
          raw_value = Bramble::Serialize.dump(map_val)
          # Push the result to be reduced
          storage.map_keys_push(impl_keys_key, raw_key)
          storage.map_result_push(data_key(handle, raw_key), raw_value)
        end
      end
    end

    # Mark this item as mapped (even if there was an error)
    Bramble::State.running?(handle) do
      finished = storage.increment(map_finished_count_key(handle))
      total = storage.get(map_total_count_key(handle)).to_i
      if finished == total
        Bramble::Reduce.perform(handle, implementation)
      end
    end
  end
end
storage() click to toggle source
# File lib/bramble/map.rb, line 50
def storage
  Bramble::Storage
end