class BeetleETL::AsyncStepRunner

Public Class Methods

new(config, steps) click to toggle source
Calls superclass method
# File lib/beetle_etl/step_runner/async_step_runner.rb, line 4
def initialize(config, steps)
  super(config, steps)

  @queue = Queue.new
  @started = Set.new
end

Public Instance Methods

run() click to toggle source
# File lib/beetle_etl/step_runner/async_step_runner.rb, line 11
def run
  results = {}

  until all_steps_complete?
    runnables.each do |step|
      run_step_async(step)
      @started.add(step.name)
    end

    step_data = @queue.pop
    add_result!(results, step_data)

    @completed.add(step_data[:step_name])
  end

  results
end

Private Instance Methods

run_step_async(step) click to toggle source
# File lib/beetle_etl/step_runner/async_step_runner.rb, line 31
def run_step_async(step)
  Thread.new do
    @queue.push run_step(step)
  end.abort_on_exception = true
end
runnables() click to toggle source
# File lib/beetle_etl/step_runner/async_step_runner.rb, line 37
def runnables
  resolvables = @dependency_resolver.resolvables(@completed)
  resolvables.reject { |r| @started.include? r.name }
end