class BeetleETL::AsyncStepRunner
Public Class Methods
new(config, steps)
click to toggle source
Calls superclass method
# File lib/beetle_etl/step_runner/async_step_runner.rb, line 4 def initialize(config, steps) super(config, steps) @queue = Queue.new @started = Set.new end
Public Instance Methods
run()
click to toggle source
# File lib/beetle_etl/step_runner/async_step_runner.rb, line 11 def run results = {} until all_steps_complete? runnables.each do |step| run_step_async(step) @started.add(step.name) end step_data = @queue.pop add_result!(results, step_data) @completed.add(step_data[:step_name]) end results end
Private Instance Methods
run_step_async(step)
click to toggle source
# File lib/beetle_etl/step_runner/async_step_runner.rb, line 31 def run_step_async(step) Thread.new do @queue.push run_step(step) end.abort_on_exception = true end
runnables()
click to toggle source
# File lib/beetle_etl/step_runner/async_step_runner.rb, line 37 def runnables resolvables = @dependency_resolver.resolvables(@completed) resolvables.reject { |r| @started.include? r.name } end