class Cult::Paramap

Attributes

block[R]
concurrent[R]
enum[R]
exception_strategy[R]
exceptions[R]
iter[R]
job_queue[R]
rebind[R]
results[R]

Public Class Methods

new(enum, rebind: {}, concurrent: nil, exception_strategy:, &block) click to toggle source
# File lib/cult/paramap.rb, line 118
def initialize(enum, rebind: {}, concurrent: nil, exception_strategy:, &block)
  @enum = enum
  @rebind = rebind
  @iter = @enum.to_enum
  @concurrent = concurrent || max_concurrent
  @exception_strategy = exception_strategy
  @block = block
  @exceptions, @results = [], []
  @job_queue = []

end

Public Instance Methods

add_job(value) click to toggle source
# File lib/cult/paramap.rb, line 164
def add_job(value)
  job_queue.push(Job.new(new_job_index, value, rebind: rebind, &block))
end
handle_exception(job) click to toggle source
# File lib/cult/paramap.rb, line 139
def handle_exception(job)
  case exception_strategy
    when :raise
      raise job.exception
    when :collect
      exceptions.push(job.exception)
    else
      fail "Bad exception_strategy: #{exception_strategy}"
  end
end
handle_response(job) click to toggle source
# File lib/cult/paramap.rb, line 154
def handle_response(job)
  job.success? ? handle_result(job) : handle_exception(job)
end
handle_result(job) click to toggle source
# File lib/cult/paramap.rb, line 150
def handle_result(job)
  results[job.ident] = job.result
end
job_by_pid(pid) click to toggle source
# File lib/cult/paramap.rb, line 168
def job_by_pid(pid)
  job_queue.find { |job| job.pid == pid }
end
job_queue_full?() click to toggle source
# File lib/cult/paramap.rb, line 184
def job_queue_full?
  job_queue.size == concurrent
end
max_concurrent() click to toggle source
# File lib/cult/paramap.rb, line 130
def max_concurrent
  case (r = Cult.concurrency)
    when :max
      enum.respond_to?(:size) ? enum.size : 200
    else
      r
  end
end
more_tasks?() click to toggle source
# File lib/cult/paramap.rb, line 188
def more_tasks?
  iter.peek
  true
rescue StopIteration
  false
end
new_job_index() click to toggle source
# File lib/cult/paramap.rb, line 158
def new_job_index
  (@job_index ||= 0).tap do
    @job_index += 1
  end
end
next_task() click to toggle source
# File lib/cult/paramap.rb, line 195
def next_task
  iter.next
end
process_finished_job(job) click to toggle source
# File lib/cult/paramap.rb, line 172
def process_finished_job(job)
  job_queue.delete(job)
  handle_response(job)
end
queue_next_task() click to toggle source
# File lib/cult/paramap.rb, line 199
def queue_next_task
  add_job(next_task)
end
report_exceptions(results) click to toggle source
# File lib/cult/paramap.rb, line 177
def report_exceptions(results)
  self_exceptions = self.exceptions
  results.define_singleton_method(:exceptions) do
    self_exceptions
  end
end
run() click to toggle source
# File lib/cult/paramap.rb, line 209
def run
  loop do
    queue_next_task until job_queue_full? || !more_tasks?
    break if job_queue.empty? && ! more_tasks?
    wait_for_next_job_to_finish
  end

  report_exceptions(self.results)
  self.results
end
wait_for_next_job_to_finish() click to toggle source
# File lib/cult/paramap.rb, line 203
def wait_for_next_job_to_finish
  if (job = job_by_pid(Process.waitpid))
    process_finished_job(job)
  end
end