class Dynflow::Executors::Sidekiq::Core
Constants
- TELEMETRY_UPDATE_INTERVAL
Attributes
logger[R]
Public Class Methods
new(world, *_args)
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core::new
# File lib/dynflow/executors/sidekiq/core.rb, line 28 def initialize(world, *_args) @world = world @logger = world.logger wait_for_orchestrator_lock super schedule_update_telemetry begin_startup! end
Public Instance Methods
begin_startup!()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 81 def begin_startup! WorkerJobs::DrainMarker.perform_async(@world.id) @recovery = true end
execution_status(execution_plan_id = nil)
click to toggle source
TODO: needs thoughs on how to implement it
# File lib/dynflow/executors/sidekiq/core.rb, line 49 def execution_status(execution_plan_id = nil) {} end
feed_pool(work_items)
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 53 def feed_pool(work_items) work_items.each do |new_work| WorkerJobs::PerformWork.set(queue: suggest_queue(new_work)).perform_async(new_work) end end
heartbeat()
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core#heartbeat
# File lib/dynflow/executors/sidekiq/core.rb, line 37 def heartbeat super reacquire_orchestrator_lock end
start_termination(*args)
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core#start_termination
# File lib/dynflow/executors/sidekiq/core.rb, line 42 def start_termination(*args) super release_orchestrator_lock finish_termination end
startup_complete()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 86 def startup_complete logger.info('Performing validity checks') @world.perform_validity_checks logger.info('Finished performing validity checks') if @world.delayed_executor && !@world.delayed_executor.started? @world.delayed_executor.start end @recovery = false end
update_telemetry()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 59 def update_telemetry sidekiq_queues = ::Sidekiq::Stats.new.queues @queues_options.keys.each do |queue| queue_size = sidekiq_queues[queue.to_s] if queue_size Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_queue_size, queue_size, telemetry_options(queue)) } end end schedule_update_telemetry end
work_finished(work, delayed_events = nil)
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core#work_finished
# File lib/dynflow/executors/sidekiq/core.rb, line 70 def work_finished(work, delayed_events = nil) # If the work item is sent in reply to a request from the current orchestrator, proceed if work.sender_orchestrator_id == @world.id super else # If we're in recovery, we can drop the work as the execution plan will be resumed during validity checks performed when leaving recovery # If we're not in recovery and receive an event from another orchestrator, it means it survived the queue draining. handle_unknown_work_item(work) unless @recovery end end
Private Instance Methods
fallback_queue()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 98 def fallback_queue :default end
handle_unknown_work_item(work)
click to toggle source
We take a look if an execution lock is already being held by an orchestrator (it should be the current one). If no lock is held we try to resume the execution plan if possible
# File lib/dynflow/executors/sidekiq/core.rb, line 112 def handle_unknown_work_item(work) # We are past recovery now, if we receive an event here, the execution plan will be most likely paused # We can either try to rescue it or turn it over to stopped execution_lock = @world.coordinator.find_locks(class: Coordinator::ExecutionLock.name, id: "execution-plan:#{work.execution_plan_id}").first if execution_lock.nil? plan = @world.persistence.load_execution_plan(work.execution_plan_id) should_resume = !plan.error? || plan.prepare_for_rescue == :running @world.execute(plan.id) if should_resume end end
schedule_update_telemetry()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 102 def schedule_update_telemetry @world.clock.ping(reference, TELEMETRY_UPDATE_INTERVAL, [:update_telemetry]) end
telemetry_options(queue)
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 106 def telemetry_options(queue) { queue: queue.to_s, world: @world.id } end