class Tumugi::Executor::LocalExecutor
Public Class Methods
new(dag, worker_num: 1, run_all: false)
click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 10 def initialize(dag, worker_num: 1, run_all: false) @dag = dag @main_task = dag.tsort.last @worker_num = worker_num @run_all = run_all @mutex = Mutex.new end
Public Instance Methods
execute()
click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 18 def execute pool = Concurrent::ThreadPoolExecutor.new( min_threads: @worker_num, max_threads: @worker_num ) setup_task_queue(@dag) loop do task = dequeue_task break if task.nil? Concurrent::Future.execute(executor: pool) do if !task.runnable?(Time.now) logger.trace { "task_not_runnable: #{task.id}" } enqueue_task(task) else begin logger.info { "task_start: #{task.id}" } task.trigger!(:start) MuchTimeout.optional_timeout(task_timeout(task), Tumugi::TimeoutError) do task.run end task.trigger!(:complete) logger.info { "task_#{task.state}: #{task.id}, elapsed_time: #{task.elapsed_time}" } task.on_success rescue => e handle_error(task, e) end end end end pool.shutdown pool.wait_for_termination @dag.tsort.all? { |t| t.success? } end
Private Instance Methods
dequeue_task()
click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 70 def dequeue_task loop do task = @mutex.synchronize { logger.trace { "task_queue_dump: #{@queue.map(&:id)}" } unless @queue.empty? @queue.shift } if task.nil? if @main_task.finished? break nil else sleep(0.1) end else logger.trace { "task_queue_dequeue: #{task.id}" } if task.requires_failed? task.trigger!(:requires_fail) logger.info { "task_#{task.state}: #{task.id} has failed requires task, elapsed_time: #{task.elapsed_time}" } elsif task.completed? && !@run_all task.trigger!(:skip) logger.info { "task_#{task.state}: #{task.id} is already completed, elapsed_time: #{task.elapsed_time}" } else break task end end end end
enqueue_task(task)
click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 99 def enqueue_task(task) logger.trace { "task_queue_enqueue: #{task.id}" } @mutex.synchronize { @queue.push(task) } end
handle_error(task, err)
click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 104 def handle_error(task, err) if task.retry task.trigger!(:pend) logger.info { "task_#{task.state}: #{task.id} failed, elapsed_time: #{task.elapsed_time}" } logger.error { "#{err.class}: '#{err.message}' - #{task.tries} tries and wait #{task.retry_interval} seconds until the next try." } enqueue_task(task) task.on_retry else task.trigger!(:fail) logger.info { "task_#{task.state}: #{task.id} failed, elapsed_time: #{task.elapsed_time}" } logger.error { "#{err.class}: '#{err.message}' - #{task.tries} tries and reached max retry count, so task #{task.id} failed." } task.on_failure end logger.error { err.backtrace.join("\n") } end
logger()
click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 120 def logger @logger ||= Tumugi::ScopedLogger.new("tumugi-executor") end
setup_task_queue(dag)
click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 64 def setup_task_queue(dag) @queue = [] dag.tsort.each { |t| enqueue_task(t) } @queue end
task_timeout(task)
click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 58 def task_timeout(task) timeout = task.timeout || Tumugi.config.timeout timeout = nil if !timeout.nil? && timeout == 0 # for backward compatibility timeout end