class ExceptionalSynchrony::ParallelSync
Public Class Methods
new(em, downstream = nil)
click to toggle source
em is the EventMachine proxy Downstream is an optional queue. If provided it will run the job for us. It must create the Fiber as well.
# File lib/exceptional_synchrony/parallel_sync.rb, line 15 def initialize(em, downstream = nil) @em = em @downstream = downstream @jobs = [] @finished = Set.new end
parallel(em, *args) { |parallel_sync| ... }
click to toggle source
# File lib/exceptional_synchrony/parallel_sync.rb, line 7 def self.parallel(em, *args) parallel_sync = new(em, *args) yield parallel_sync parallel_sync.run_all! end
Public Instance Methods
add(proc = nil, &block)
click to toggle source
# File lib/exceptional_synchrony/parallel_sync.rb, line 22 def add(proc = nil, &block) job = proc || block @jobs << job end
run_all!()
click to toggle source
Runs all the jobs that have been added. Returns the hash of responses where the key is their ordinal number, in order.
# File lib/exceptional_synchrony/parallel_sync.rb, line 29 def run_all! original_fiber = Fiber.current @responses = (0...@jobs.size).build_hash { |key| [key, nil] } # initialize in sorted order so we don't have to sort later @jobs.each_with_index do |job, index| run_and_finish = lambda do |*args| @responses[index] = CallbackExceptions.return_exception(*args, &job) @finished.add(index) check_progress(original_fiber) end if @downstream if job.respond_to?(:encapsulate) cancel_proc = -> do @responses[index] = :cancelled @finished.add(index) end @downstream.add(job.encapsulate(cancel: cancel_proc, &run_and_finish)) else @downstream.add(&run_and_finish) end else Fiber.new(&run_and_finish).resume end end unless finished? @yielded = true Fiber.yield end raise_any_exceptions(@responses) @responses end
Private Instance Methods
check_progress(original_fiber)
click to toggle source
# File lib/exceptional_synchrony/parallel_sync.rb, line 67 def check_progress(original_fiber) if finished? && original_fiber.alive? && original_fiber != Fiber.current && @yielded original_fiber.resume end end
finished?()
click to toggle source
# File lib/exceptional_synchrony/parallel_sync.rb, line 73 def finished? @finished.size == @jobs.size end
raise_any_exceptions(responses)
click to toggle source
# File lib/exceptional_synchrony/parallel_sync.rb, line 77 def raise_any_exceptions(responses) if (exceptions = responses.values.select { |response| response.is_a?(Exception) }).any? master_exception, *remaining_exceptions = exceptions remaining_exceptions.each do |ex| master_exception.message << "\n====================================\n#{ex.class}: #{ex.to_s}" end raise master_exception end end