class Cult::Paramap
Attributes
block[R]
concurrent[R]
enum[R]
exception_strategy[R]
exceptions[R]
iter[R]
job_queue[R]
rebind[R]
results[R]
Public Class Methods
new(enum, rebind: {}, concurrent: nil, exception_strategy:, &block)
click to toggle source
# File lib/cult/paramap.rb, line 118 def initialize(enum, rebind: {}, concurrent: nil, exception_strategy:, &block) @enum = enum @rebind = rebind @iter = @enum.to_enum @concurrent = concurrent || max_concurrent @exception_strategy = exception_strategy @block = block @exceptions, @results = [], [] @job_queue = [] end
Public Instance Methods
add_job(value)
click to toggle source
# File lib/cult/paramap.rb, line 164 def add_job(value) job_queue.push(Job.new(new_job_index, value, rebind: rebind, &block)) end
handle_exception(job)
click to toggle source
# File lib/cult/paramap.rb, line 139 def handle_exception(job) case exception_strategy when :raise raise job.exception when :collect exceptions.push(job.exception) else fail "Bad exception_strategy: #{exception_strategy}" end end
handle_response(job)
click to toggle source
# File lib/cult/paramap.rb, line 154 def handle_response(job) job.success? ? handle_result(job) : handle_exception(job) end
handle_result(job)
click to toggle source
# File lib/cult/paramap.rb, line 150 def handle_result(job) results[job.ident] = job.result end
job_by_pid(pid)
click to toggle source
# File lib/cult/paramap.rb, line 168 def job_by_pid(pid) job_queue.find { |job| job.pid == pid } end
job_queue_full?()
click to toggle source
# File lib/cult/paramap.rb, line 184 def job_queue_full? job_queue.size == concurrent end
max_concurrent()
click to toggle source
# File lib/cult/paramap.rb, line 130 def max_concurrent case (r = Cult.concurrency) when :max enum.respond_to?(:size) ? enum.size : 200 else r end end
more_tasks?()
click to toggle source
# File lib/cult/paramap.rb, line 188 def more_tasks? iter.peek true rescue StopIteration false end
new_job_index()
click to toggle source
# File lib/cult/paramap.rb, line 158 def new_job_index (@job_index ||= 0).tap do @job_index += 1 end end
next_task()
click to toggle source
# File lib/cult/paramap.rb, line 195 def next_task iter.next end
process_finished_job(job)
click to toggle source
# File lib/cult/paramap.rb, line 172 def process_finished_job(job) job_queue.delete(job) handle_response(job) end
queue_next_task()
click to toggle source
# File lib/cult/paramap.rb, line 199 def queue_next_task add_job(next_task) end
report_exceptions(results)
click to toggle source
# File lib/cult/paramap.rb, line 177 def report_exceptions(results) self_exceptions = self.exceptions results.define_singleton_method(:exceptions) do self_exceptions end end
run()
click to toggle source
# File lib/cult/paramap.rb, line 209 def run loop do queue_next_task until job_queue_full? || !more_tasks? break if job_queue.empty? && ! more_tasks? wait_for_next_job_to_finish end report_exceptions(self.results) self.results end
wait_for_next_job_to_finish()
click to toggle source
# File lib/cult/paramap.rb, line 203 def wait_for_next_job_to_finish if (job = job_by_pid(Process.waitpid)) process_finished_job(job) end end