class Roby::ExecutionEngine
@api private
The core execution algorithm
It is in charge of handling event and exception propagation, as well as running cleanup processes (e.g. garbage collection).
The main method is {#process_events}. When executing a Roby
application, it is called periodically by {#event_loop}.
In addition, there is a special “synchronous” propagation mode that is used by {EventGenerator#call} and {EventGenerator#emit}. This mode is used when the event code is not executed within an engine, but from an imperative script, as in unit tests.
Constants
- EXCEPTION_FATAL
Exception
kind passed to {#on_exception} handlers for fatal, unhandled exceptions- EXCEPTION_FREE_EVENT
Exception
kind passed to {#on_exception} handlers for free event exceptions- EXCEPTION_HANDLED
Exception
kind passed to {#on_exception} handlers for handled exceptions- EXCEPTION_NONFATAL
Exception
kind passed to {#on_exception} handlers for non-fatal, unhandled exceptions- INTERRUPT_FORCE_EXIT_DEAD_ZONE
How many seconds between two Interrupt before the execution engine's loop can forcefully quit
- PENDING_PROPAGATION_FORWARD
- PENDING_PROPAGATION_SIGNAL
- PropagationInfo
Gathering of all the errors that happened during an event processing loop and were not handled
- SLEEP_MIN_TIME
Do not sleep or call Thread#pass if there is less that this much time left in the cycle
Attributes
Used during exception propagation to inject new errors in the process
It shall not be accessed directly. Instead, Plan#add_error should be called
The set of errors which have been generated outside of the plan's control. For now, those errors cause the whole controller to shut down.
A set of blocks that are called at each cycle end
The DecisionControl
object associated with this engine
The number of this cycle since the beginning
The cycle length in seconds
The starting Time of this cycle
The set of pending delayed events. This is an array of the form
[[time, is_forward, source, target, context], ...]
See add_event_delay
for more information
Cached graph object for {TaskStructure::Dependency}
This is here for performance reasons, to avoid resolving the same graph over and over
Poll blocks that have been disabled because they raised an exception
@return [Array<PollBlockDefinition>]
The set of events that have been emitted within the last call to {#process_events} (i.e. the last execution of the event loop)
@return [Array<Event>]
The underlying {DRoby::EventLogger}
It is usually the same than the {#plan}'s. Pass a {DRoby::NullEventLogger} at construction time to disable logging of execution events.
The topological ordering of events w.r.t. the Precedence relation. This gets updated on-demand when the event relations change.
The event => index hash which give the propagation priority for each event
The blocks that are currently listening to exceptions @return [Array<#call>]
A set of proc objects which are to be called when the execution engine quits.
Cached graph object for {EventStructure::Forward}
This is here for performance reasons, to avoid resolving the same graph over and over
Thread-safe queue to push work to the execution engine
Do not access directly, use {#once} instead
@return [Queue] blocks that should be executed at the beginning of the
next execution cycle. It is the only thread safe way to queue work to be executed by the engine
The Plan
this engine is acting on
Cached graph object for {EventStructure::Precedence}
This is here for performance reasons, to avoid resolving the same graph over and over
A set of blocks which are called every cycle
A numeric ID giving the count of the current propagation cycle
The set of source events for the current propagation action. This is a mix of EventGenerator
and Event
objects.
The scheduler is the object which handles non-generic parts of the propagation cycle. For now, its initial_events method is called at the beginning of each propagation cycle and can call or emit a set of events.
Cached graph object for {EventStructure::Signal}
This is here for performance reasons, to avoid resolving the same graph over and over
The execution thread if there is one running
A thread pool on which async work should be executed
@see {#promise} @return [Concurrent::CachedThreadPool]
A list of threaded objects waiting for the control thread
Objects registered here will be notified them by calling {#fail} when it quits. In addition, {#join_all_waiting_work} will wait for all pending jobs to finish.
Note that all {Concurrent::Obligation} subclasses fit the bill
@return [Array<#fail,#complete?>]
Public Class Methods
Returns a Time object which represents the absolute point in time referenced by timespec
in the context of delaying a propagation between source
and target
.
See validate_timespec
for more information
# File lib/roby/execution_engine.rb, line 931 def self.make_delay(timeref, source, target, timespec) if delay = timespec[:delay] then timeref + delay elsif at = timespec[:at] then at else raise ArgumentError, "invalid timespec #{timespec}" end end
Create an execution engine acting on plan
, using control
as the decision control object
@param [ExecutablePlan] plan the plan on which this engine acts @param [DecisionControl] control the policy object, i.e. the object
that embeds policies in cases where multiple reactions would be possible
@param [DRoby::EventLogger] event_logger
the logger that should be
used to trace execution events. It is by default the same than the {#plan}'s. Pass a {DRoby::NullEventLogger} instance to disable event logging for this engine.
# File lib/roby/execution_engine.rb, line 80 def initialize(plan, control: Roby::DecisionControl.new, event_logger: plan.event_logger) @plan = plan @event_logger = event_logger @use_oob_gc = ExecutionEngine.use_oob_gc? @control = control @scheduler = Schedulers::Null.new(plan) reset_thread_pool @thread = Thread.current @propagation = nil @propagation_id = 0 @propagation_exceptions = nil @application_exceptions = nil @delayed_events = [] @event_ordering = Array.new @event_priorities = Hash.new @propagation_handlers = [] @external_events_handlers = [] @at_cycle_end_handlers = Array.new @process_every = Array.new @waiting_work = Concurrent::Array.new @emitted_events = Array.new @disabled_handlers = Set.new @exception_listeners = Array.new @worker_threads_mtx = Mutex.new @worker_threads = Array.new @once_blocks = Queue.new @pending_exceptions = Hash.new each_cycle(&ExecutionEngine.method(:call_every)) @quit = 0 @allow_propagation = true @cycle_index = 0 @cycle_start = Time.now @cycle_length = 0.1 @last_stop_count = 0 @finalizers = [] @gc_warning = true refresh_relations self.display_exceptions = true end
Validates timespec
as a delay specification. A valid delay specification is either nil
or a hash, in which case two forms are possible:
at: absolute_time delay: number
# File lib/roby/execution_engine.rb, line 920 def self.validate_timespec(timespec) if timespec timespec = validate_options timespec, [:delay, :at] end end
Public Instance Methods
Register a LocalizedError
for future propagation
This method must be called in a error-gathering context (i.e. {#gather_error}.
@param [#to_execution_exception] e the exception @raise [NotPropagationContext] raised if called outside
{#gather_error}
# File lib/roby/execution_engine.rb, line 582 def add_error(e, propagate_through: nil) plan_exception = e.to_execution_exception if @propagation_exceptions @propagation_exceptions << [plan_exception, propagate_through] else Roby.log_exception_with_backtrace(e, self, :fatal) raise NotPropagationContext, "#add_error called outside an error-gathering context (#add_error)" end end
Adds a propagation step to be performed when the current time is greater than time
. The propagation step is a signal if is_forward
is false and a forward otherwise.
This method should not be called directly. Use add_event_propagation
with the appropriate timespec
argument.
See also delayed_events
and execute_delayed_events
# File lib/roby/execution_engine.rb, line 495 def add_event_delay(time, is_forward, source, target, context) delayed_events << [time, is_forward, source, target, context] end
Adds a propagation to the next propagation step: it registers a propagation step to be performed between source
and target
with the given context
. If is_forward
is true, the propagation will be a forwarding, otherwise it is a signal.
If timespec
is not nil, it defines a delay to be applied before calling the target event.
# File lib/roby/execution_engine.rb, line 700 def add_event_propagation(is_forward, sources, target, context, timespec) if target.plan != plan raise Roby::EventNotExecutable.new(target), "#{target} not in executed plan" end target.pending(sources.find_all { |ev| ev.kind_of?(Event) }) @propagation_step_id += 1 target_info = (@propagation[target] ||= [@propagation_step_id, [], []]) step = target_info[is_forward ? PENDING_PROPAGATION_FORWARD : PENDING_PROPAGATION_SIGNAL] if sources.empty? step << nil << context << timespec else sources.each do |ev| step << ev << context << timespec end end end
Register a set of fatal exceptions to ensure that they will be inhibited in the next exception propagation cycles
# File lib/roby/execution_engine.rb, line 1827 def add_exceptions_for_inhibition(fatal_errors) fatal_errors.each do |exception, involved_tasks| involved_tasks.each do |t| (@pending_exceptions[t] ||= Set.new) << [exception.exception.class, exception.origin] end end end
Registers the given error and a description of its source in the list of application/framework errors
It must be called within an exception-gathering context, that is either within {#process_events}, or within {#gather_framework_errors}
These errors will terminate the event loop
@param [Exception] error @param [Object] source
# File lib/roby/execution_engine.rb, line 653 def add_framework_error(error, source) if @application_exceptions @application_exceptions << [error, source] else Roby.log_exception_with_backtrace(error, self, :fatal) raise NotPropagationContext, "#add_framework_error called outside an exception-gathering context" end end
Adds a block to be called at the end of each execution cycle
@return [Object] an object that allows to identify the block so that
it can be removed with {#remove_at_cycle_end}
@yieldparam [Plan] plan the plan on which this engine runs
# File lib/roby/execution_engine.rb, line 2038 def at_cycle_end(description: 'at_cycle_end', **options, &block) handler = PollBlockDefinition.new(description, block, **options) at_cycle_end_handlers << handler handler.object_id end
Helper that calls the propagation handlers in propagation_handlers
(which are expected to be instances of PollBlockDefinition
) and handles the errors according of each handler's policy
# File lib/roby/execution_engine.rb, line 740 def call_poll_blocks(blocks, late = false) blocks.delete_if do |handler| if handler.disabled? || (handler.late? ^ late) next end log_timepoint_group handler.description do if !handler.call(self, plan) handler.disabled = true end end handler.once? end end
# File lib/roby/execution_engine.rb, line 776 def call_propagation_handlers process_once_blocks if scheduler.enabled? gather_framework_errors('scheduler') do scheduler.initial_events log_timepoint 'scheduler' end end call_poll_blocks(self.class.propagation_handlers, false) call_poll_blocks(self.propagation_handlers, false) if !has_queued_events? call_poll_blocks(self.class.propagation_handlers, true) call_poll_blocks(self.propagation_handlers, true) end end
Sets up the plan for clearing: it discards all missions and undefines all permanent tasks and events.
Returns nil if the plan is cleared, and the set of remaining tasks otherwise. Note that quaranteened tasks are not counted as remaining, as it is not possible for the execution engine to stop them.
# File lib/roby/execution_engine.rb, line 2180 def clear plan.mission_tasks.dup.each { |t| plan.unmark_mission_task(t) } plan.permanent_tasks.dup.each { |t| plan.unmark_permanent_task(t) } plan.permanent_events.dup.each { |t| plan.unmark_permanent_event(t) } plan.force_gc.merge( plan.tasks ) quaranteened_subplan = plan.compute_useful_tasks(plan.quarantined_tasks) remaining = plan.tasks - quaranteened_subplan @pending_exceptions.clear if remaining.empty? # Have to call #garbage_collect one more to make # sure that unneeded events are removed as well garbage_collect # Done cleaning the tasks, clear the remains plan.transactions.each do |trsc| trsc.discard_transaction if trsc.self_owned? end plan.clear emitted_events.clear return end remaining end
# File lib/roby/execution_engine.rb, line 1378 def clear_application_exceptions if !@application_exceptions raise RecursivePropagationContext, "unbalanced call to #clear_application_exceptions" end result, @application_exceptions = @application_exceptions, nil result end
Compute the set of fatal errors in the current execution state
@param [Array] events_errors the set of errors gathered during event
propagation
@return [PropagationInfo]
# File lib/roby/execution_engine.rb, line 1398 def compute_errors(events_errors) # Generate exceptions from task structure structure_errors = plan.check_structure log_timepoint 'structure_check' # Propagate the errors. Note that the plan repairs are taken into # account in ExecutionEngine.propagate_exceptions directly. We keep # event and structure errors separate since in the first case there # is not two-stage handling (all errors that have not been handled # are fatal), and in the second case we call #check_structure # again to errors that are remaining after the call to the exception # handlers events_errors, free_events_errors, events_handled = propagate_exceptions(events_errors) _, structure_handled = propagate_exceptions(structure_errors) log_timepoint 'exception_propagation' # Get the remaining problems in the plan structure, and act on it structure_errors, structure_inhibited = remove_inhibited_exceptions(plan.check_structure) # Partition them by fatal/nonfatal fatal_errors, nonfatal_errors = Array.new, Array.new (structure_errors + events_errors).each do |e, involved_tasks| if e.fatal? fatal_errors << [e, involved_tasks] else nonfatal_errors << [e, involved_tasks] end end kill_tasks = compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors).to_set handled_errors = structure_handled + events_handled debug "#{fatal_errors.size} fatal errors found and #{free_events_errors.size} errors involving free events" debug "the fatal errors involve #{kill_tasks.size} non-finalized tasks" return PropagationInfo.new(Set.new, Set.new, kill_tasks, fatal_errors, nonfatal_errors, free_events_errors, handled_errors, structure_inhibited) end
Compute the set of unhandled fatal exceptions
# File lib/roby/execution_engine.rb, line 856 def compute_kill_tasks_for_unhandled_fatal_errors(fatal_errors) kill_tasks = fatal_errors.inject(Set.new) do |tasks, (exception, affected_tasks)| tasks.merge(affected_tasks) end # Tasks might have been finalized during exception handling, filter # those out kill_tasks.find_all(&:plan) end
Called at each cycle end
# File lib/roby/execution_engine.rb, line 2396 def cycle_end(stats, raise_framework_errors: Roby.app.abort_on_application_exception?) gather_framework_errors("#cycle_end", raise_caught_exceptions: raise_framework_errors) do call_poll_blocks(at_cycle_end_handlers) end end
Schedules block
to be called once after delay
seconds passed, in the propagation context
# File lib/roby/execution_engine.rb, line 1366 def delayed(delay, description: 'delayed block', **options, &block) handler = PollBlockDefinition.new(description, block, once: true, **options) once do process_every << [handler, cycle_start, delay] end handler.id end
Controls whether this engine should indiscriminately display all fatal exceptions
This is on by default
# File lib/roby/execution_engine.rb, line 2549 def display_exceptions=(flag) if flag @exception_display_handler ||= on_exception do |kind, error, tasks| level = if kind == EXCEPTION_HANDLED then :debug else :warn end send(level) do send(level, "encountered a #{kind} exception") Roby.log_exception_with_backtrace(error.exception, self, level) if kind == EXCEPTION_HANDLED send(level, "the exception was handled by") else send(level, "the exception involved") end tasks.each do |t| send(level, " #{t}") end break end end else remove_exception_listener(@exception_display_handler) @exception_display_handler = nil end end
whether this engine should indiscriminately display all fatal exceptions
# File lib/roby/execution_engine.rb, line 2578 def display_exceptions? !!@exception_display_handler end
Compute errors in plan and handle the results
# File lib/roby/execution_engine.rb, line 841 def error_handling_phase(events_errors) # Do the exception handling phase errors = compute_errors(events_errors) notify_about_error_handling_results(errors) # nonfatal errors are only notified. Fatal errors (kill_tasks) are # handled in the propagation loop during garbage collection. Only # the free events errors have to be handled here. errors.free_events_errors.each do |exception, generators| generators.each { |g| g.unreachable!(exception.exception) } end return errors end
The main event loop. It returns when the execution engine is asked to quit. In general, this does not need to be called direclty: use run
to start the event loop in a separate thread.
# File lib/roby/execution_engine.rb, line 2227 def event_loop last_stop_count = 0 last_quit_warning = Time.now @cycle_start = Time.now @cycle_index = 0 force_exit_deadline = nil last_process_times = Process.times last_dump_time = plan.event_logger.dump_time loop do begin if profile_gc? GC::Profiler.enable end if quitting? if forced_exit? return end begin remaining = clear return if !remaining if (last_stop_count != remaining.size) || (Time.now - last_quit_warning) > 10 if last_stop_count == 0 info "Roby quitting ..." end issue_quit_progression_warning(remaining) last_quit_warning = Time.now last_stop_count = remaining.size end rescue Exception => e warn "Execution thread failed to clean up" Roby.log_exception_with_backtrace(e, self, :warn, filter: false) return end end log_timepoint_group_start "cycle" while Time.now > cycle_start + cycle_length @cycle_start += cycle_length @cycle_index += 1 end stats = Hash.new stats[:start] = [cycle_start.tv_sec, cycle_start.tv_usec] stats[:actual_start] = Time.now - cycle_start stats[:cycle_index] = cycle_index log_timepoint_group 'process_events' do process_events end remaining_cycle_time = cycle_length - (Time.now - cycle_start) if use_oob_gc? stats[:pre_oob_gc] = GC.stat GC::OOB.run end # Sleep if there is enough time for it if remaining_cycle_time > SLEEP_MIN_TIME sleep(remaining_cycle_time) end log_timepoint 'sleep' cycle_end(stats) # Log cycle statistics process_times = Process.times dump_time = plan.event_logger.dump_time stats[:log_queue_size] = plan.log_queue_size stats[:plan_task_count] = plan.num_tasks stats[:plan_event_count] = plan.num_free_events stats[:gc] = GC.stat stats[:utime] = process_times.utime - last_process_times.utime stats[:stime] = process_times.stime - last_process_times.stime stats[:dump_time] = dump_time - last_dump_time stats[:state] = Roby::State stats[:end] = Time.now - cycle_start if profile_gc? stats[:gc_profile_data] = GC::Profiler.raw_data stats[:gc_total_time] = GC::Profiler.total_time else stats[:gc_profile_data] = nil stats[:gc_total_time] = 0 end log_flush_cycle :cycle_end, stats last_dump_time = dump_time last_process_times = process_times stats = Hash.new @cycle_start += cycle_length @cycle_index += 1 if profile_gc? GC::Profiler.disable end rescue Exception => e if e.kind_of?(Interrupt) if quitting? if force_exit_deadline && (force_exit_deadline - Time.now) < 0 fatal "Quitting without cleaning up" force_quit else fatal "Still #{Integer(force_exit_deadline - Time.now)}s before interruption will quit without cleaning up" end else fatal "Received interruption request" fatal "Interrupt again in #{INTERRUPT_FORCE_EXIT_DEAD_ZONE}s to quit without cleaning up" quit force_exit_deadline = Time.now + INTERRUPT_FORCE_EXIT_DEAD_ZONE end elsif !quitting? quit fatal "Execution thread quitting because of unhandled exception" Roby.log_exception_with_backtrace(e, self, :fatal) else fatal "Execution thread FORCEFULLY quitting because of unhandled exception" Roby.log_exception_with_backtrace(e, self, :fatal) raise end ensure log_timepoint_group_end "cycle" end end ensure if !plan.tasks.empty? warn "the following tasks are still present in the plan:" plan.tasks.each do |t| warn " #{t}" end end end
Calls its block in a gather_propagation
context and propagate events that have been called and/or emitted by the block
If a block is given, it is called with the initial set of events: the events we should consider as already emitted in the following propagation. seeds
si a list of procs which should be called to initiate the propagation (i.e. build an initial set of events)
# File lib/roby/execution_engine.rb, line 826 def event_propagation_phase(initial_events, propagation_info) @propagation_id += 1 gather_errors do next_steps = initial_events while !next_steps.empty? while !next_steps.empty? next_steps = event_propagation_step(next_steps, propagation_info) end next_steps = gather_propagation { call_propagation_handlers } end end end
Propagate one step
current_step
describes all pending emissions and calls.
This method calls ExecutionEngine.next_event
to get the description of the next event to call. If there are signals going to this event, they are processed and the forwardings will be treated in the next step.
The method returns the next set of pending emissions and calls, adding the forwardings and signals that the propagation of the considered event have added.
# File lib/roby/execution_engine.rb, line 1053 def event_propagation_step(current_step, propagation_info) signalled, step_id, forward_info, call_info = next_event(current_step) next_step = nil if !call_info.empty? source_events, source_generators, context = prepare_propagation(signalled, false, call_info) if source_events log(:generator_propagate_events, false, source_events, signalled) if signalled.self_owned? next_step = gather_propagation(current_step) do propagation_context(source_events | source_generators) do begin propagation_info.add_generator_call(signalled) signalled.call_without_propagation(context) rescue Roby::LocalizedError => e if signalled.command_emitted? add_error(e) else signalled.emit_failed(e) end rescue Exception => e if signalled.command_emitted? add_error(Roby::CommandFailed.new(e, signalled)) else signalled.emit_failed(Roby::CommandFailed.new(e, signalled)) end end end end end end if forward_info next_step ||= Hash.new target_info = (next_step[signalled] ||= [@propagation_step_id += 1, [], []]) target_info[PENDING_PROPAGATION_FORWARD].concat(forward_info) end elsif !forward_info.empty? source_events, source_generators, context = prepare_propagation(signalled, true, forward_info) if source_events log(:generator_propagate_events, true, source_events, signalled) # If the destination event is not owned, but if the peer is not # connected, the event is our responsibility now. if signalled.self_owned? || !signalled.owners.any? { |peer| peer != plan.local_owner && peer.connected? } next_step = gather_propagation(current_step) do propagation_context(source_events | source_generators) do begin if event = signalled.emit_without_propagation(context) propagation_info.add_event_emission(event) emitted_events << event end rescue Roby::LocalizedError => e Roby.warn "Internal Error: #emit_without_propagation emitted a LocalizedError exception. This is unsupported and will become a fatal error in the future. You should usually replace raise with engine.add_error" Roby.display_exception(Roby.logger.io(:warn), e, false) add_error(e) rescue Exception => e Roby.warn "Internal Error: #emit_without_propagation emitted an exception. This is unsupported and will become a fatal error in the future. You should create a proper localized error and replace raise with engine.add_error" Roby.display_exception(Roby.logger.io(:warn), e, false) add_error(Roby::EmissionFailed.new(e, signalled)) end end end end end end current_step.merge!(next_step) if next_step current_step end
Call block
every duration
seconds. Note that duration
is round up to the cycle size (time between calls is *at least* duration)
The returned value is the periodic handler ID. It can be passed to remove_periodic_handler
to undefine it.
# File lib/roby/execution_engine.rb, line 2059 def every(duration, description: 'periodic handler', **options, &block) handler = PollBlockDefinition.new(description, block, **options) once do if handler.call(self, plan) process_every << [handler, cycle_start, duration] end end handler.id end
Block until the given block is executed by the execution thread, at the beginning of the event loop, in propagation context. If the block raises, the exception is raised back in the calling thread.
# File lib/roby/execution_engine.rb, line 2405 def execute(catch: [], type: :external_events) if inside_control? return yield end capture_catch = lambda do |symbol, *other| caught = catch(symbol) do if other.empty? return [:ret, yield] else return capture_catch(block, *other) end end [:throw, [symbol, caught]] end ivar = Concurrent::IVar.new once(sync: ivar, type: type) do begin if !catch.empty? result = capture_catch.call(*catch) { yield } ivar.set(result) else ivar.set([:ret, yield]) end rescue ::Exception => e # rubocop:disable Lint/RescueException ivar.set([:raise, e]) end end mode, value = ivar.value! case mode when :ret return value when :throw throw *value else raise value end end
Adds the events in delayed_events
whose time has passed into the propagation. This must be called in propagation context.
See add_event_delay
and delayed_events
# File lib/roby/execution_engine.rb, line 503 def execute_delayed_events reftime = Time.now delayed_events.delete_if do |time, forward, source, signalled, context| if time <= reftime add_event_propagation(forward, [source], signalled, context, nil) true end end end
Called by plan
when an event has been finalized
# File lib/roby/execution_engine.rb, line 524 def finalized_event(event) if @propagation @propagation.delete(event) end event.unreachable!("finalized", plan) # since the event is already finalized, end
Called by plan
when a task has been finalized
# File lib/roby/execution_engine.rb, line 519 def finalized_task(task) @pending_exceptions.delete(task) end
Force quitting, without cleaning up
# File lib/roby/execution_engine.rb, line 2388 def force_quit; @quit = 2 end
True if the control thread is currently quitting
# File lib/roby/execution_engine.rb, line 2384 def forced_exit?; @quit > 1 end
Kills and removes all unneeded tasks. force_on
is a set of task whose garbage-collection must be performed, even though those tasks are actually useful for the system. This is used to properly kill tasks for which errors have been detected.
@return [Boolean] true if events have been called (thus requiring
some propagation) and false otherwise
# File lib/roby/execution_engine.rb, line 1861 def garbage_collect(force_on = nil) if force_on && !force_on.empty? info "GC: adding #{force_on.size} tasks in the force_gc set" mismatching_plan = force_on.find_all do |t| if t.plan == self.plan plan.force_gc << t false else true end end if !mismatching_plan.empty? raise ArgumentError, "#{mismatching_plan.map { |t| "#{t}(plan=#{t.plan})" }.join(", ")} have been given to #{self}.garbage_collect, but they are not tasks in #{plan}" end end unmark_finished_missions_and_permanent_tasks # The set of tasks for which we queued stop! at this cycle # #finishing? is false until the next event propagation cycle finishing = Set.new did_something = true while did_something did_something = false tasks = plan.unneeded_tasks | plan.force_gc local_tasks = plan.local_tasks & tasks remote_tasks = tasks - local_tasks # Remote tasks are simply removed, regardless of other concerns for t in remote_tasks debug { "GC: removing the remote task #{t}" } plan.garbage_task(t) end break if local_tasks.empty? debug do debug "#{local_tasks.size} tasks are unneeded in this plan" local_tasks.each do |t| debug " #{t} mission=#{plan.mission_task?(t)} permanent=#{plan.permanent_task?(t)}" end break end if local_tasks.all? { |t| t.pending? || t.finished? } local_tasks.each do |t| debug { "GC: #{t} is not running, removed" } if plan.garbage_task(t) did_something = true end end break end # Mark all root local_tasks as garbage. roots = local_tasks.dup plan.each_task_relation_graph do |g| next if !g.root_relation? || g.weak? roots.delete_if do |t| g.each_in_neighbour(t).any? { |p| !p.finished? } end break if roots.empty? end (roots.to_set - finishing).each do |local_task| if local_task.pending? info "GC: removing pending task #{local_task}" if plan.garbage_task(local_task) did_something = true end elsif local_task.failed_to_start? info "GC: removing task that failed to start #{local_task}" if plan.garbage_task(local_task) did_something = true end elsif local_task.starting? # wait for task to be started before killing it debug { "GC: #{local_task} is starting" } elsif local_task.finished? debug { "GC: #{local_task} is not running, removed" } if plan.garbage_task(local_task) did_something = true end elsif !local_task.finishing? if local_task.quarantined? warn "GC: #{local_task} is running but in quarantine" elsif local_task.event(:stop).controlable? debug { "GC: attempting to stop #{local_task}" } if !local_task.respond_to?(:stop!) warn "something fishy: #{local_task}/stop is controlable but there is no #stop! method, putting in quarantine" plan.quarantine_task(local_task) else finishing << local_task end else warn "GC: #{local_task} cannot be stopped, putting in quarantine" plan.quarantine_task(local_task) end elsif local_task.finishing? debug do debug "GC: waiting for #{local_task} to finish" local_task.history.each do |ev| debug "GC: #{ev}" end break end else warn "GC: ignored #{local_task}" end end end finishing.each do |task| task.stop! end plan.unneeded_events.each do |event| plan.garbage_event(event) end !finishing.empty? end
Executes the given block while gathering errors, and returns the errors that have been declared with add_error
@return [Array<ExecutionException>]
# File lib/roby/execution_engine.rb, line 801 def gather_errors if @propagation_exceptions raise InternalError, "recursive call to #gather_errors" end # The ensure clause must NOT apply to the recursive check above. # Otherwise, we end up resetting @propagation_exceptions to nil, # which wreaks havoc begin @propagation_exceptions = [] yield @propagation_exceptions ensure @propagation_exceptions = nil end end
Gather the events that come out of this plan manager
# File lib/roby/execution_engine.rb, line 769 def gather_external_events process_once_blocks gather_framework_errors('delayed events') { execute_delayed_events } call_poll_blocks(self.class.external_events_handlers) call_poll_blocks(self.external_events_handlers) end
Yields to the block and registers any raised exception using {#add_framework_error}
If the method is called within an exception-gathering context (either {#process_events} or {#gather_framework_errors} itself), nothing else is done. Otherwise, {#process_pending_application_exceptions} is called to re-raise any caught exception
# File lib/roby/execution_engine.rb, line 599 def gather_framework_errors(source, raise_caught_exceptions: true) if @application_exceptions recursive_error_gathering_context = true else @application_exceptions = [] end yield if !recursive_error_gathering_context && !raise_caught_exceptions clear_application_exceptions end rescue Exception => e add_framework_error(e, source) if !recursive_error_gathering_context && !raise_caught_exceptions clear_application_exceptions end ensure if !recursive_error_gathering_context && raise_caught_exceptions process_pending_application_exceptions end end
Sets up a propagation context, yielding the block in it. During this propagation stage, all calls to emit and call are stored in an internal hash of the form:
target => [forward_sources, signal_sources]
where the two _sources
are arrays of the form
[[source, context], ...]
The method returns the resulting hash. Use in_propagation_context?
to know if the current engine is in a propagation context, and add_event_propagation
to add a new entry to this set.
# File lib/roby/execution_engine.rb, line 548 def gather_propagation(initial_set = Hash.new) raise InternalError, "nested call to #gather_propagation" if in_propagation_context? old_allow_propagation, @allow_propagation = @allow_propagation, true # The ensure clause must NOT apply to the recursive check above. # Otherwise, we end up resetting @propagation_exceptions to nil, # which wreaks havoc begin @propagation = initial_set @propagation_sources = nil @propagation_step_id = 0 before = @propagation propagation_context([]) do yield end result, @propagation = @propagation, nil return result ensure @propagation = nil @allow_propagation = old_allow_propagation end end
# File lib/roby/execution_engine.rb, line 440 def gathering? Roby.warn_deprecated "#gathering? is deprecated, use #in_propagation_context? instead" in_propagation_context? end
# File lib/roby/execution_engine.rb, line 793 def gathering_errors? !!@propagation_exceptions end
Tests whether there is an exception registered by {#add_fatal_exceptions_for_inhibition} for a given error and object
@param [ExecutionException] e @param [Task,Plan] the handling object
# File lib/roby/execution_engine.rb, line 1821 def has_pending_exception_matching?(e, object) @pending_exceptions[object] && @pending_exceptions[object].include?([e.exception.class, e.origin]) end
Whether a forward matching this signature is currently pending
# File lib/roby/execution_engine.rb, line 720 def has_pending_forward?(from, to, expected_context) if pending = @propagation[to] pending[PENDING_PROPAGATION_FORWARD].each_slice(3).any? do |event, context, timespec| (from === event.generator) && (expected_context === context) end end end
Whether a signal matching this signature is currently pending
# File lib/roby/execution_engine.rb, line 729 def has_pending_signal?(from, to, expected_context) if pending = @propagation[to] pending[PENDING_PROPAGATION_SIGNAL].each_slice(3).any? do |event, context, timespec| (from === event.generator) && (expected_context === context) end end end
# File lib/roby/execution_engine.rb, line 674 def has_propagation_for?(target) @propagation && @propagation.has_key?(target) end
Returns true if some events are queued
# File lib/roby/execution_engine.rb, line 533 def has_queued_events? !@propagation.empty? end
Whether this EE has asynchronous waiting work waiting to be processed
# File lib/roby/execution_engine.rb, line 1435 def has_waiting_work? # Filter out unscheduled promises (promises on which #execute was # not called). If they are unscheduled, we're not waiting on them waiting_work.any? { |w| !w.unscheduled? } end
True if we are within a propagation context (i.e. within event processing)
# File lib/roby/execution_engine.rb, line 447 def in_propagation_context? !!@propagation end
Query whether the given exception is inhibited in this plan
# File lib/roby/execution_engine.rb, line 1346 def inhibited_exception?(exception) unhandled, _ = remove_inhibited_exceptions([exception.to_execution_exception]) unhandled.empty? end
True if the current thread is the execution thread of this engine
See outside_control?
for a discussion of the use of inside_control?
and outside_control?
when testing the threading context
# File lib/roby/execution_engine.rb, line 2096 def inside_control? t = thread !t || t == Thread.current end
# File lib/roby/execution_engine.rb, line 2209 def issue_quit_progression_warning(remaining) info "Waiting for #{remaining.size} tasks to finish (#{plan.num_tasks} tasks still in plan) and #{waiting_work.size} async work jobs" remaining.each do |task| info " #{task}" end quarantined = remaining.find_all { |t| t.quarantined? } if quarantined.size != 0 info "#{quarantined.size} tasks in quarantine" end end
Waits for all obligations in {#waiting_work} to finish
# File lib/roby/execution_engine.rb, line 389 def join_all_waiting_work(timeout: nil) return [], PropagationInfo.new if waiting_work.empty? deadline = if timeout Time.now + timeout end finished = Array.new propagation_info = PropagationInfo.new begin framework_errors = gather_framework_errors("#join_all_waiting_work", raise_caught_exceptions: false) do next_steps = nil event_errors = gather_errors do next_steps = gather_propagation do finished.concat(process_waiting_work) blocks = Array.new while !once_blocks.empty? blocks << once_blocks.pop.last end call_poll_blocks(blocks) end end this_propagation = propagate_events_and_errors(next_steps, event_errors, garbage_collect_pass: false) propagation_info.merge(this_propagation) end propagation_info.add_framework_errors(framework_errors) Thread.pass has_scheduled_promises = has_waiting_work? if deadline && (Time.now > deadline) && has_scheduled_promises raise JoinAllWaitingWorkTimeout.new(waiting_work) end end while has_waiting_work? return finished, propagation_info end
Kill all tasks that are currently running in the plan
# File lib/roby/execution_engine.rb, line 2488 def killall scheduler_enabled = scheduler.enabled? plan.permanent_tasks.clear plan.permanent_events.clear plan.mission_tasks.clear scheduler.enabled = false quit start_new_cycle process_events cycle_end(Hash.new) plan.transactions.each do |trsc| trsc.discard_transaction! end start_new_cycle Thread.pass process_events cycle_end(Hash.new) ensure scheduler.enabled = scheduler_enabled end
Determines the event in current_step
which should be signalled now. Removes it from the set and returns the event and the associated propagation information.
See gather_propagation
for the format of the returned # propagation_info
# File lib/roby/execution_engine.rb, line 954 def next_event(pending) # this variable is 2 if selected_event is being forwarded, 1 if it # is both forwarded and signalled and 0 if it is only signalled priority, step_id, selected_event = nil for propagation_step in pending target_event = propagation_step[0] target_step_id, forwards, signals = *propagation_step[1] target_priority = if forwards.empty? && signals.empty? then 2 elsif forwards.empty? then 0 else 1 end do_select = if selected_event if precedence_graph.reachable?(selected_event, target_event) false elsif precedence_graph.reachable?(target_event, selected_event) true elsif priority < target_priority true elsif priority == target_priority # If they are of the same priority, handle # earlier events first step_id > target_step_id else false end else true end if do_select selected_event = target_event priority = target_priority step_id = target_step_id end end [selected_event, *pending.delete(selected_event)] end
Issue the warning message and log notifications related to tasks being killed because of unhandled fatal exceptions
# File lib/roby/execution_engine.rb, line 867 def notify_about_error_handling_results(errors) kill_tasks, fatal_errors, nonfatal_errors, free_events_errors, handled_errors = errors.kill_tasks, errors.fatal_errors, errors.nonfatal_errors, errors.free_events_errors, errors.handled_errors if !nonfatal_errors.empty? if display_exceptions? warn "#{nonfatal_errors.size} unhandled non-fatal exceptions" end nonfatal_errors.each do |exception, tasks| notify_exception(EXCEPTION_NONFATAL, exception, tasks) end end if !handled_errors.empty? if display_exceptions? warn "#{handled_errors.size} handled errors" end handled_errors.each do |exception, tasks| notify_exception(EXCEPTION_HANDLED, exception, tasks) end end if !free_events_errors.empty? if display_exceptions? warn "#{free_events_errors.size} free event exceptions" end free_events_errors.each do |exception, events| notify_exception(EXCEPTION_FREE_EVENT, exception, events) end end if !fatal_errors.empty? if display_exceptions? warn "#{fatal_errors.size} unhandled fatal exceptions, involving #{kill_tasks.size} tasks that will be forcefully killed" end fatal_errors.each do |exception, tasks| notify_exception(EXCEPTION_FATAL, exception, tasks) end if display_exceptions? kill_tasks.each do |task| log_pp :warn, task end end end end
Call to notify the listeners registered with {#on_exception} of the occurence of an exception
# File lib/roby/execution_engine.rb, line 2592 def notify_exception(kind, error, involved_objects) log(:exception_notification, plan.droby_id, kind, error, involved_objects) exception_listeners.each do |listener| listener.call(self, kind, error, involved_objects) end end
Registers a callback that will be called when exceptions are propagated in the plan
@yieldparam [Symbol] kind one of {EXCEPTION_NONFATAL},
{EXCEPTION_FATAL}, {EXCEPTION_FREE_EVENT} or {EXCEPTION_HANDLED}
@yieldparam [Roby::ExecutionException] error the exception @yieldparam [Array<Roby::Task>] tasks the tasks that are involved in this exception
@return [Object] an ID that can be used as argument to {#remove_exception_listener}
# File lib/roby/execution_engine.rb, line 2539 def on_exception(description: 'exception listener', on_error: :disable, &block) handler = PollBlockDefinition.new(description, block, on_error: on_error) exception_listeners << handler handler end
Schedules block
to be called at the beginning of the next execution cycle, in propagation context.
@param [#fail] sync a synchronization object that is used to
communicate between the once block and the calling thread. The main use of this parameter is to make sure that #fail is called if the execution engine quits
@param (see PropagationHandlerMethods#create_propagation_handler
)
# File lib/roby/execution_engine.rb, line 1359 def once(sync: nil, description: 'once block', type: :external_events, **options, &block) waiting_work << sync if sync once_blocks << create_propagation_handler(description: description, type: type, once: true, **options, &block) end
True if the current thread is not the execution thread of this engine, or if there is not control thread. When you check the current thread context, always use a negated form. Do not do
if Roby.inside_control? ERROR end
Do instead
if !Roby.outside_control? ERROR end
Since the first form will fail if there is no control thread, while the second form will work. Use the first form only if you require that there actually IS a control thread.
# File lib/roby/execution_engine.rb, line 2118 def outside_control? t = thread !t || t != Thread.current end
Parses the propagation information info
in the context of a signalling if is_forward
is true and a forwarding otherwise. target
is the target event.
The method adds the appropriate delayed events using add_event_delay
, and returns either nil if no propagation is to be performed, or the propagation source events, generators and context.
The format of info
is the same as the hash values described in gather_propagation
.
# File lib/roby/execution_engine.rb, line 1007 def prepare_propagation(target, is_forward, info) timeref = Time.now source_events, source_generators, context = Set.new, Set.new, [] delayed = true info.each_slice(3) do |src, ctxt, time| if time && (delay = ExecutionEngine.make_delay(timeref, src, target, time)) add_event_delay(delay, is_forward, src, target, ctxt) next end delayed = false # Merge identical signals. Needed because two different event handlers # can both call #emit, and two signals are set up if src if src.respond_to?(:generator) source_events << src source_generators << src.generator else source_generators << src end end if ctxt context.concat ctxt end end unless delayed [source_events, source_generators, context] end end
The inside part of the event loop
It gathers initial events and errors and propagate them
@return [PropagationInfo] what happened during the propagation @raise RecursivePropagationContext
if called recursively
# File lib/roby/execution_engine.rb, line 1647 def process_events(raise_framework_errors: Roby.app.abort_on_application_exception?, garbage_collect_pass: true, &caller_block) if @application_exceptions raise RecursivePropagationContext, "recursive call to process_events" end passed_recursive_check = true # to avoid having a almost-method-global ensure block @application_exceptions = [] @emitted_events = Array.new @thread_pool.send :synchronize do @thread_pool.send(:ns_prune_pool) end # Gather new events and propagate them events_errors = nil next_steps = gather_propagation do events_errors = gather_errors do if caller_block yield caller_block = nil end if !quitting? || !garbage_collect([]) process_waiting_work log_timepoint 'workers' gather_external_events log_timepoint 'external_events' call_propagation_handlers log_timepoint 'propagation_handlers' end end end propagation_info = propagate_events_and_errors(next_steps, events_errors, garbage_collect_pass: garbage_collect_pass) if Roby.app.abort_on_exception? && !all_errors.fatal_errors.empty? raise Aborting.new(propagation_info.each_fatal_error.map(&:exception)) end propagation_info.framework_errors.concat(@application_exceptions) propagation_info ensure if passed_recursive_check process_pending_application_exceptions(raise_framework_errors: raise_framework_errors) end end
Tests are using a special mode for propagation, in which everything is resolved when emit or call is called, including error handling. This mode is implemented using this method
When errors occur in this mode, the exceptions are raised directly. This is useful in tests as, this way, we are sure that the exception will not get overlooked
If multiple errors are raised in a single call (this is possible due to Roby's error handling mechanisms), the method will raise SynchronousEventProcessingMultipleErrors
to wrap all the exceptions into one.
# File lib/roby/execution_engine.rb, line 1704 def process_events_synchronous(seeds = Hash.new, initial_errors = Array.new, enable_scheduler: false, raise_errors: true) Roby.warn_deprecated "#process_events_synchronous is deprecated, use the expect_execution harness instead" if @application_exceptions raise RecursivePropagationContext, "recursive call to process_events" end passed_recursive_check = true # to avoid having a almost-method-global ensure block @application_exceptions = [] # Save early for the benefit of the 'ensure' block current_scheduler_enabled = scheduler.enabled? if (!seeds.empty? || !initial_errors.empty?) && block_given? raise ArgumentError, "cannot provide both seeds/inital errors and a block" elsif block_given? seeds = gather_propagation do initial_errors = gather_errors do yield end end end scheduler.enabled = enable_scheduler propagation_info = propagate_events_and_errors(seeds, initial_errors, garbage_collect_pass: false) if !propagation_info.kill_tasks.empty? gc_initial_errors = nil gc_seeds = gather_propagation do gc_initial_errors = gather_errors do garbage_collect(propagation_info.kill_tasks) end end gc_errors = propagate_events_and_errors(gc_seeds, gc_initial_errors, garbage_collect_pass: false) propagation_info.merge(gc_errors) end if raise_errors propagation_info = propagation_info.exceptions if propagation_info.size == 1 raise propagation_info.first elsif !propagation_info.empty? raise SynchronousEventProcessingMultipleErrors.new(propagation_info.map(&:exception)) end else propagation_info end rescue SynchronousEventProcessingMultipleErrors => e raise SynchronousEventProcessingMultipleErrors.new(e.errors + clear_application_exceptions) rescue Exception => e if passed_recursive_check application_exceptions = clear_application_exceptions if !application_exceptions.empty? raise SynchronousEventProcessingMultipleErrors.new(application_exceptions.map(&:first) + [e]) else raise e end else raise e end ensure if passed_recursive_check && @application_exceptions process_pending_application_exceptions end scheduler.enabled = current_scheduler_enabled end
Dispatch {#once_blocks} to the other handler sets for further processing
# File lib/roby/execution_engine.rb, line 757 def process_once_blocks while !once_blocks.empty? type, block = once_blocks.pop if type == :external_events external_events_handlers << block else propagation_handlers << block end end end
# File lib/roby/execution_engine.rb, line 622 def process_pending_application_exceptions(application_errors = clear_application_exceptions, raise_framework_errors: Roby.app.abort_on_application_exception?) # We don't aggregate exceptions, so report them all and raise one if display_exceptions? application_errors.each do |error, source| if !error.kind_of?(Interrupt) fatal "Application error in #{source}" Roby.log_exception_with_backtrace(error, self, :fatal) end end end error, source = application_errors.find do |error, _| raise_framework_errors || error.kind_of?(SignalException) end if error raise error, "in #{source}: #{error.message}", error.backtrace end end
Process asynchronous work registered in {#waiting_work} to clear completed work and/or handle errors that were not handled by the async object itself (e.g. a {Promise} without a {Promise#on_error} handler)
# File lib/roby/execution_engine.rb, line 1444 def process_waiting_work finished, not_finished = waiting_work.partition do |work| work.complete? end finished.find_all do |work| work.rejected? && (work.respond_to?(:has_error_handler?) && !work.has_error_handler?) end.each do |work| e = work.reason e.set_backtrace(e.backtrace + caller) add_framework_error(e, work.to_s) end @waiting_work = not_finished finished end
Create a promise to execute the given block in a separate thread
Note that the returned value is a {Roby::Promise}. This means that callbacks added with on_success or rescue will be executed in the execution engine thread by default.
# File lib/roby/execution_engine.rb, line 2604 def promise(description: nil, executor: thread_pool, &block) Promise.new(self, executor: executor, description: description, &block) end
Propagate an initial set of event propagations and errors
@param [Array] next_steps the next propagations, as returned by
{#gather_propagation}
@param [Array] initial_errors a set of errors that should be
propagated
@param [Boolean] garbage_collect_pass whether the garbage collection
pass should be performed or not. It is used in the tests' codepath for {EventGenerator#call} and {EventGenerator#emit}.
@return [PropagationInfo] what happened during the propagation
and propagated
# File lib/roby/execution_engine.rb, line 1784 def propagate_events_and_errors(next_steps, initial_errors, garbage_collect_pass: true) propagation_info = PropagationInfo.new events_errors = initial_errors.dup begin log_timepoint_group 'event_propagation_phase' do events_errors.concat(event_propagation_phase(next_steps, propagation_info)) end next_steps = gather_propagation do exception_propagation_errors, error_phase_results = nil log_timepoint_group 'error_handling_phase' do exception_propagation_errors = gather_errors do error_phase_results = error_handling_phase(events_errors) end end add_exceptions_for_inhibition(error_phase_results.each_fatal_error) propagation_info.merge(error_phase_results) garbage_collection_errors = gather_errors do plan.generate_induced_errors(error_phase_results) if garbage_collect_pass garbage_collect(error_phase_results.kill_tasks) else [] end end events_errors = (exception_propagation_errors + garbage_collection_errors) log_timepoint 'garbage_collect' end end while !next_steps.empty? || !events_errors.empty? propagation_info end
The core exception propagation algorithm
@param [Array<(ExecutionException
,Array<Task>)>] exceptions the set of
exceptions to propagate, as well as the parents that towards which we should propagate them (if empty, all parents)
@yieldparam [ExecutionException] exception the exception that is being
propagated
@yieldparam [Task,Plan] handling_object the object we want to test
whether it handles the exception or not
@yieldreturn [Boolean] true if the exception is handled, false
otherwise
@return [Array<(ExecutionException
,Array<Task>)>] the set of unhandled
exceptions, as a mapping from an exception description to the set of tasks that are affected by it
# File lib/roby/execution_engine.rb, line 1181 def propagate_exception_in_plan(exceptions) propagation_graph = dependency_graph.reverse # Propagate the exceptions in the hierarchy handled_unhandled = Array.new exceptions.each do |exception, parents| origin = exception.origin if parents filtered_parents = parents.find_all { |t| t.depends_on?(origin) } if filtered_parents != parents warn "some parents specified for #{exception.exception}(#{exception.exception.class}) are actually not parents of #{origin}, they got filtered out" (parents - filtered_parents).each do |task| warn " #{task}" end if filtered_parents.empty? parents = propagation_graph.out_neighbours(origin) else parents = filtered_parents end end else parents = propagation_graph.out_neighbours(origin) end debug do debug "propagating exception " log_pp :debug, exception if !parents.empty? debug " constrained to parents" log_nest(2) do parents.each do |p| log_pp :debug, p end end end break end visitor = ExceptionPropagationVisitor.new(propagation_graph, exception, origin, parents) do |e, task| yield(e, task) end visitor.visit unhandled = visitor.unhandled_exceptions.inject { |a, b| a.merge(b) } handled = visitor.handled_exceptions.inject { |a, b| a.merge(b) } handled_unhandled << [handled, unhandled] end exceptions_handled_by = Array.new unhandled_exceptions = Array.new handled_unhandled.each do |handled, e| if e if e.handled = yield(e, plan) if handled handled_by = (handled.propagation_leafs.to_set << plan) exceptions_handled_by << [handled.merge(e), handled_by] else handled = e exceptions_handled_by << [e, [plan].to_set] end else affected_tasks = e.trace.vertices.to_set if handled affected_tasks -= handled.trace.vertices exceptions_handled_by << [handled, handled.propagation_leafs.to_set] end unhandled_exceptions << [e, affected_tasks] end else exceptions_handled_by << [handled, handled.propagation_leafs.to_set] end end debug do debug "#{unhandled_exceptions.size} unhandled exceptions remain" log_nest(2) do unhandled_exceptions.each do |e, affected_tasks| log_pp :debug, e debug "Affects #{affected_tasks.size} tasks" log_nest(2) do affected_tasks.each do |t| log_pp :debug, t end end end end break end return unhandled_exceptions, exceptions_handled_by end
Propagation exception phase, checking if tasks and/or the main plan are handling the exceptions
@param [Array<(ExecutionException
,Array<Task>)>] exceptions the set of
exceptions to propagate, as well as the parents that towards which we should propagate them (if empty, all parents)
@return (see propagate_exception_in_plan
)
# File lib/roby/execution_engine.rb, line 1281 def propagate_exceptions(exceptions) if exceptions.empty? return Array.new, Array.new, Array.new end # Remove all exception that are not associated with a task exceptions, free_events_exceptions = exceptions.partition do |e, _| e.origin end # Normalize the free events exceptions free_events_exceptions = free_events_exceptions.map do |e, _| if e.exception.failed_generator.plan [e, Set[e.exception.failed_generator]] end end.compact debug "Filtering inhibited exceptions" exceptions = log_nest(2) do non_inhibited, _ = remove_inhibited_exceptions(exceptions) # Reset the trace for the real propagation non_inhibited.map do |e, _| _, propagate_through = exceptions.find { |original_e, _| original_e.exception == e.exception } e.reset_trace [e, propagate_through] end end debug "Propagating #{exceptions.size} non-inhibited exceptions" log_nest(2) do # Note that the first half of the method filtered the free # events exceptions out of 'exceptions' unhandled, handled = propagate_exception_in_plan(exceptions) do |e, object| object.handle_exception(e) end return unhandled, free_events_exceptions, handled end end
Sets the source_event and source_generator variables according to source
. source
is the from
argument of add_event_propagation
# File lib/roby/execution_engine.rb, line 664 def propagation_context(sources) current_sources = @propagation_sources raise InternalError, "not in a gathering context in #propagation_context" unless in_propagation_context? @propagation_sources = sources yield ensure @propagation_sources = current_sources end
The set of events extracted from sources
# File lib/roby/execution_engine.rb, line 457 def propagation_source_events result = Set.new for ev in @propagation_sources if ev.respond_to?(:generator) result << ev end end result end
The set of generators extracted from sources
# File lib/roby/execution_engine.rb, line 468 def propagation_source_generators result = Set.new for ev in @propagation_sources result << if ev.respond_to?(:generator) ev.generator else ev end end result end
Queue a forwarding to be propagated
# File lib/roby/execution_engine.rb, line 684 def queue_forward(sources, target, context, timespec) add_event_propagation(true, sources, target, context, timespec) end
Queue a signal to be propagated
# File lib/roby/execution_engine.rb, line 679 def queue_signal(sources, target, context, timespec) add_event_propagation(false, sources, target, context, timespec) end
Make control quit properly
# File lib/roby/execution_engine.rb, line 2386 def quit; @quit = 1 end
True if the control thread is currently quitting
# File lib/roby/execution_engine.rb, line 2382 def quitting?; @quit > 0 end
Refresh the value of cached relations
Some often-used relations are cached at {#initialize}, such as {#dependency_graph} and {#precedence_graph}. Call this when the actual graph objects have changed on the plan
# File lib/roby/execution_engine.rb, line 134 def refresh_relations @dependency_graph = plan.task_relation_graph_for(TaskStructure::Dependency) @precedence_graph = plan.event_relation_graph_for(EventStructure::Precedence) @signal_graph = plan.event_relation_graph_for(EventStructure::Signal) @forward_graph = plan.event_relation_graph_for(EventStructure::Forwarding) end
Removes a handler added by {#at_cycle_end}
@param [Object] handler_id the value returned by {#at_cycle_end}
# File lib/roby/execution_engine.rb, line 2047 def remove_at_cycle_end(handler_id) at_cycle_end_handlers.delete_if { |h| h.object_id == handler_id } end
Removes an exception listener registered with {#on_exception}
@param [Object] the value returned by {#on_exception} @return [void]
# File lib/roby/execution_engine.rb, line 2586 def remove_exception_listener(handler) exception_listeners.delete(handler) end
Process the given exceptions to remove the ones that are currently filtered by the plan repairs
The returned exceptions are propagated, i.e. their trace method contains all the tasks that are affected by the absence of a handling mechanism
@param [(ExecutionException
,Array<Roby::Task>)] exceptions pairs of
exceptions as well as the "root tasks", i.e. the parents of origin.task towards which they should be propagated
@return [Array<ExecutionException>] the unhandled exceptions
# File lib/roby/execution_engine.rb, line 1331 def remove_inhibited_exceptions(exceptions) exceptions = exceptions.find_all do |execution_exception, _| execution_exception.origin.plan end propagate_exception_in_plan(exceptions) do |e, object| if has_pending_exception_matching?(e, object) true elsif object.respond_to?(:handles_error?) object.handles_error?(e) end end end
Roby::ExecutionEngine::PropagationHandlerMethods#remove_propagation_handler
# File lib/roby/execution_engine.rb, line 362 def remove_propagation_handler(id) disabled_handlers.delete_if { |p| p.id == id } super nil end
Make a quit EE ready for reuse
# File lib/roby/execution_engine.rb, line 2391 def reset @quit = 0 end
# File lib/roby/execution_engine.rb, line 2480 def reset_thread_pool if @thread_pool @thread_pool.shutdown end @thread_pool = Concurrent::CachedThreadPool.new(idletime: 10) end
Main event loop. Valid options are
- cycle
-
the cycle duration in seconds (default: 0.1)
# File lib/roby/execution_engine.rb, line 2127 def run(cycle: 0.1) if running? raise AlreadyRunning, "#run has already been called" end self.running = true @allow_propagation = false @waiting_work = Concurrent::Array.new @thread = Thread.current @thread.name = "MAIN" @cycle_length = cycle event_loop ensure self.running = false @thread = nil waiting_work.delete_if do |w| next(true) if w.complete? # rubocop:disable Lint/HandleExceptions begin w.fail ExecutionQuitError Roby.warn "forcefully terminated #{w} on quit" rescue Concurrent::MultipleAssignmentError # Race condition: something completed the promise while # we were trying to make it fail end # rubocop:enable Lint/HandleExceptions true end finalizers.each do |blk| begin blk.call rescue Exception => e Roby.warn "finalizer #{blk} failed" Roby.log_exception_with_backtrace(e, Roby, :warn) end end @quit = 0 @allow_propagation = true end
# File lib/roby/execution_engine.rb, line 433 def scheduler=(scheduler) if !scheduler raise ArgumentError, "cannot set the scheduler to nil. You can disable the current scheduler with .enabled = false instead, or set it to Schedulers::Null.new" end @scheduler = scheduler end
# File lib/roby/execution_engine.rb, line 2475 def shutdown killall thread_pool.shutdown end
Set
the cycle_start
attribute and increment cycle_index
This is only used for testing purposes
# File lib/roby/execution_engine.rb, line 2372 def start_new_cycle(time = Time.now) @cycle_start = time @cycle_index += 1 end
# File lib/roby/execution_engine.rb, line 1836 def unmark_finished_missions_and_permanent_tasks to_unmark = plan.task_index.by_predicate[:finished?] | plan.task_index.by_predicate[:failed?] finished_missions = (plan.mission_tasks & to_unmark) # Remove all missions that are finished for finished_mission in finished_missions if !finished_mission.being_repaired? plan.unmark_mission_task(finished_mission) end end finished_permanent = (plan.permanent_tasks & to_unmark) for finished_permanent in (plan.permanent_tasks & to_unmark) if !finished_permanent.being_repaired? plan.unmark_permanent_task(finished_permanent) end end end
Called by EventGenerator
when an event became unreachable
# File lib/roby/execution_engine.rb, line 514 def unreachable_event(event) delayed_events.delete_if { |_, _, _, signalled, _| signalled == event } end
Blocks until at least once execution cycle has been done
# File lib/roby/execution_engine.rb, line 1991 def wait_one_cycle current_cycle = execute { cycle_index } while current_cycle == execute { cycle_index } raise ExecutionQuitError if !running? sleep(cycle_length) end end
Stops the current thread until the given even is emitted. If the event becomes unreachable, an UnreachableEvent
exception is raised.
# File lib/roby/execution_engine.rb, line 2448 def wait_until(ev) if inside_control? raise ThreadMismatch, "cannot use #wait_until in execution threads" end ivar = Concurrent::IVar.new result = nil once(sync: ivar) do if ev.unreachable? ivar.fail(UnreachableEvent.new(ev, ev.unreachability_reason)) else ev.if_unreachable(cancel_at_emission: true) do |reason, event| ivar.fail(UnreachableEvent.new(event, reason)) if !ivar.complete? end ev.once do |ev| ivar.set(result) if !ivar.complete? end begin result = yield if block_given? rescue Exception => e ivar.fail(e) end end end ivar.value! end