class Roby::Promise
An extension to {Concurrent::Promise} that is aware of the mixed thread/event loop nature of Roby
Use {ExecutionEngine#promise} to create one
{#on_success} and {#rescue} gain an in_engine argument, which decides whether the given block should be executed by the underlying execution engine's or not. It is true by default. Note that {#then} is not overriden
This promise implementation has no graph capabilities. The execution must be a pipeline, and a whole pipeline is represented by a single Promise
. State predicates such as fulfilled?
or rejected?
are valid for the whole pipeline. There is no way to handle errors for only parts of the pipeline.
Constants
- PipelineElement
Representation of one element in the pipeline
Attributes
A description text for debugging purposes
The pipeline that will be executed if an error happens in {#pipeline}
@return [Array<PipelineElement>]
The execution engine we execute on
The pipeline itself
@return [Array<PipelineElement>]
The Promise
object from concurrent-ruby that handles the nominal part of the execution
Public Class Methods
# File lib/roby/promise.rb, line 35 def initialize(execution_engine, executor: execution_engine.thread_pool, description: "promise", &block) @execution_engine = execution_engine execution_engine.waiting_work << self @description = description @pipeline = Array.new @error_pipeline = Array.new @promise = Concurrent::Promise.new(executor: executor, &method(:run_pipeline)) @current_element = Concurrent::AtomicReference.new if block self.then(&block) end end
# File lib/roby/promise.rb, line 293 def self.null(value = nil) Null.new(value) end
Public Instance Methods
Register a block that will be called on this promise's termination
@yieldparam [Time] time the termination time @yieldparam [Object,nil] result the promise result if it finished
execution successfully, or nil if an exception was raised
@yieldparam [Object,nil] reason the exception that terminated this
promise if it failed, or nil if it finished successfully
# File lib/roby/promise.rb, line 289 def add_observer(&block) promise.add_observer(&block) end
Queue a block at the beginning of the pipeline
# File lib/roby/promise.rb, line 184 def before(description: "#{self.description}.before", in_engine: true, &block) pipeline.unshift PipelineElement.new(description, in_engine, block) self end
# File lib/roby/promise.rb, line 236 def complete? promise.complete? end
The description element being currently executed
@return [String,nil]
# File lib/roby/promise.rb, line 62 def current_element @current_element.get end
Whether this promise does have elements
# File lib/roby/promise.rb, line 55 def empty? @pipeline.empty? end
# File lib/roby/promise.rb, line 223 def execute promise.execute self end
# File lib/roby/promise.rb, line 219 def fail(exception = StandardError) promise.fail(exception) end
# File lib/roby/promise.rb, line 240 def fulfilled? promise.fulfilled? end
Whether self already has an error handler
Unlike {Concurrent::Promise}, {Roby::Promise} objects can only have one error handler
# File lib/roby/promise.rb, line 179 def has_error_handler? !error_pipeline.empty? end
Whether this is a null promise
# File lib/roby/promise.rb, line 50 def null? false end
Schedule execution of a block if self or one of its parents failed
@param [String] description a textual description useful for debugging @param [Boolean] in_engine whether the block should be executed within
the underlying {ExecutionEngine}, a.k.a. in the main thread, or scheduled in a separate thread.
@yieldparam [Object] reason the exception that caused the failure,
usually an exception that was raised by one of the promise blocks.
# File lib/roby/promise.rb, line 208 def on_error(description: "#{self.description}.on_error", in_engine: true, &block) error_pipeline << PipelineElement.new(description, in_engine, block) self end
Schedule execution of a block on the success of self
@param [String] description a textual description useful for debugging @param [Boolean] in_engine whether the block should be executed within
the underlying {ExecutionEngine}, a.k.a. in the main thread, or scheduled in a separate thread.
# File lib/roby/promise.rb, line 195 def on_success(description: "#{self.description}.on_success[#{pipeline.size}]", in_engine: true, &block) pipeline << PipelineElement.new(description, in_engine, block) self end
# File lib/roby/promise.rb, line 232 def pending? promise.pending? end
# File lib/roby/promise.rb, line 146 def pretty_print(pp) description = self.description pp.text "Roby::Promise(#{description}" if current_element = self.current_element pp.text ", currently: #{current_element})" else pp.text ")" end pipeline.each do |element| pp.nest(2) do pp.text "." pp.breakable if element.run_in_engine pp.text "on_success(#{element.description})" else pp.text "then(#{element.description})" end end end error_pipeline.each do |element| pp.nest(2) do pp.text "." pp.breakable pp.text "on_error(#{element.description}, in_engine: #{element.run_in_engine})" end end end
Returns the exception that caused the promise to be rejected
# File lib/roby/promise.rb, line 271 def reason if failure = promise.reason failure.actual_exception end end
# File lib/roby/promise.rb, line 244 def rejected? promise.rejected? end
@api private
Helper method for {#run_pipeline_elements}, to run a sequence of elements in a pipeline that have the same run_in_engine?
# File lib/roby/promise.rb, line 130 def run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true) while (element = pipeline.first) && !(in_engine ^ element.run_in_engine) pipeline.shift @current_element.set(element.description) new_state = execution_engine.log_timepoint_group "#{element.description} in_engine=#{element.run_in_engine}" do element.callback.call(state) end state = new_state if propagate_state end state end
@api private
Internal implementation of the pipeline. This holds a thread until it is finished - there's no point in giving the thread back between the steps in the pipeline, given how the promises are used in Roby
(to avoid freezing due to blocking calls)
# File lib/roby/promise.rb, line 75 def run_pipeline(*state) Thread.current.name = "run_promises" execution_engine.log_timepoint_group "#{description}" do begin run_pipeline_elements(self.pipeline, state) rescue Exception => exception run_pipeline_elements(self.error_pipeline, exception, propagate_state: false) raise Failure.new(exception) end end ensure @current_element.set(nil) end
@api private
Run one of {#pipeline} or {#error_pipeline}
# File lib/roby/promise.rb, line 93 def run_pipeline_elements(pipeline, state, propagate_state: true) pipeline = pipeline.dup while !pipeline.empty? state = run_one_pipeline_segment(pipeline, state, false, propagate_state: propagate_state) if !pipeline.empty? state = execution_engine.log_timepoint_group "#{description}:execute_in_engine" do execution_engine.execute(type: :propagation) do run_one_pipeline_segment(pipeline, state, true, propagate_state: propagate_state) end end end end state end
The promise's execution state
# File lib/roby/promise.rb, line 278 def state promise.state end
Alias for {#on_success}, but defaulting to execution as a separate thread
# File lib/roby/promise.rb, line 215 def then(description: "#{self.description}.then[#{pipeline.size}]", &block) on_success(description: description, in_engine: false, &block) end
# File lib/roby/promise.rb, line 142 def to_s "#<Roby::Promise #{description}>" end
# File lib/roby/promise.rb, line 228 def unscheduled? promise.unscheduled? end
# File lib/roby/promise.rb, line 252 def value(timeout = nil) if promise.complete? promise.value(timeout) else raise NotComplete, "cannot call #value on a non-complete promise" end end
# File lib/roby/promise.rb, line 260 def value!(timeout = nil) if promise.complete? promise.value!(timeout) else raise NotComplete, "cannot call #value on a non-complete promise" end rescue Failure => e raise e.actual_exception end
# File lib/roby/promise.rb, line 248 def wait promise.wait end