class Roby::Promise

An extension to {Concurrent::Promise} that is aware of the mixed thread/event loop nature of Roby

Use {ExecutionEngine#promise} to create one

{#on_success} and {#rescue} gain an in_engine argument, which decides whether the given block should be executed by the underlying execution engine's or not. It is true by default. Note that {#then} is not overriden

This promise implementation has no graph capabilities. The execution must be a pipeline, and a whole pipeline is represented by a single Promise. State predicates such as fulfilled? or rejected? are valid for the whole pipeline. There is no way to handle errors for only parts of the pipeline.

Constants

PipelineElement

Representation of one element in the pipeline

Attributes

description[R]

A description text for debugging purposes

error_pipeline[R]

The pipeline that will be executed if an error happens in {#pipeline}

@return [Array<PipelineElement>]

execution_engine[R]

The execution engine we execute on

pipeline[R]

The pipeline itself

@return [Array<PipelineElement>]

promise[R]

The Promise object from concurrent-ruby that handles the nominal part of the execution

Public Class Methods

new(execution_engine, executor: execution_engine.thread_pool, description: "promise", &block) click to toggle source
# File lib/roby/promise.rb, line 35
def initialize(execution_engine, executor: execution_engine.thread_pool, description: "promise", &block)
    @execution_engine = execution_engine
    execution_engine.waiting_work << self
    @description = description

    @pipeline = Array.new
    @error_pipeline = Array.new
    @promise = Concurrent::Promise.new(executor: executor, &method(:run_pipeline))
    @current_element = Concurrent::AtomicReference.new
    if block
        self.then(&block)
    end
end
null(value = nil) click to toggle source
# File lib/roby/promise.rb, line 293
def self.null(value = nil)
    Null.new(value)
end

Public Instance Methods

add_observer(&block) click to toggle source

Register a block that will be called on this promise's termination

@yieldparam [Time] time the termination time @yieldparam [Object,nil] result the promise result if it finished

execution successfully, or nil if an exception was raised

@yieldparam [Object,nil] reason the exception that terminated this

promise if it failed, or nil if it finished successfully
# File lib/roby/promise.rb, line 289
def add_observer(&block)
    promise.add_observer(&block)
end
before(description: " click to toggle source

Queue a block at the beginning of the pipeline

# File lib/roby/promise.rb, line 184
def before(description: "#{self.description}.before", in_engine: true, &block)
    pipeline.unshift PipelineElement.new(description, in_engine, block)
    self
end
complete?() click to toggle source
# File lib/roby/promise.rb, line 236
def complete?
    promise.complete?
end
current_element() click to toggle source

The description element being currently executed

@return [String,nil]

# File lib/roby/promise.rb, line 62
def current_element
    @current_element.get
end
empty?() click to toggle source

Whether this promise does have elements

# File lib/roby/promise.rb, line 55
def empty?
    @pipeline.empty?
end
execute() click to toggle source
# File lib/roby/promise.rb, line 223
def execute
    promise.execute
    self
end
fail(exception = StandardError) click to toggle source
# File lib/roby/promise.rb, line 219
def fail(exception = StandardError)
    promise.fail(exception)
end
fulfilled?() click to toggle source
# File lib/roby/promise.rb, line 240
def fulfilled?
    promise.fulfilled?
end
has_error_handler?() click to toggle source

Whether self already has an error handler

Unlike {Concurrent::Promise}, {Roby::Promise} objects can only have one error handler

# File lib/roby/promise.rb, line 179
def has_error_handler?
    !error_pipeline.empty?
end
null?() click to toggle source

Whether this is a null promise

# File lib/roby/promise.rb, line 50
def null?
    false
end
on_error(description: " click to toggle source

Schedule execution of a block if self or one of its parents failed

@param [String] description a textual description useful for debugging @param [Boolean] in_engine whether the block should be executed within

the underlying {ExecutionEngine}, a.k.a. in the main thread, or
scheduled in a separate thread.

@yieldparam [Object] reason the exception that caused the failure,

usually an exception that was raised by one of the promise blocks.
# File lib/roby/promise.rb, line 208
def on_error(description: "#{self.description}.on_error", in_engine: true, &block)
    error_pipeline << PipelineElement.new(description, in_engine, block)
    self
end
on_success(description: " click to toggle source

Schedule execution of a block on the success of self

@param [String] description a textual description useful for debugging @param [Boolean] in_engine whether the block should be executed within

the underlying {ExecutionEngine}, a.k.a. in the main thread, or
scheduled in a separate thread.
# File lib/roby/promise.rb, line 195
def on_success(description: "#{self.description}.on_success[#{pipeline.size}]", in_engine: true, &block)
    pipeline << PipelineElement.new(description, in_engine, block)
    self
end
pending?() click to toggle source
# File lib/roby/promise.rb, line 232
def pending?
    promise.pending?
end
pretty_print(pp) click to toggle source
# File lib/roby/promise.rb, line 146
def pretty_print(pp)
    description = self.description
    pp.text "Roby::Promise(#{description}"
    if current_element = self.current_element
        pp.text ", currently: #{current_element})"
    else
        pp.text ")"
    end

    pipeline.each do |element|
        pp.nest(2) do
            pp.text "."
            pp.breakable
            if element.run_in_engine
                pp.text "on_success(#{element.description})"
            else
                pp.text "then(#{element.description})"
            end
        end
    end
    error_pipeline.each do |element|
        pp.nest(2) do
            pp.text "."
            pp.breakable
            pp.text "on_error(#{element.description}, in_engine: #{element.run_in_engine})"
        end
    end
end
reason() click to toggle source

Returns the exception that caused the promise to be rejected

# File lib/roby/promise.rb, line 271
def reason
    if failure = promise.reason
        failure.actual_exception
    end
end
rejected?() click to toggle source
# File lib/roby/promise.rb, line 244
def rejected?
    promise.rejected?
end
run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true) click to toggle source

@api private

Helper method for {#run_pipeline_elements}, to run a sequence of elements in a pipeline that have the same run_in_engine?

# File lib/roby/promise.rb, line 130
def run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true)
    while (element = pipeline.first) && !(in_engine ^ element.run_in_engine)
        pipeline.shift
        @current_element.set(element.description)
        new_state = execution_engine.log_timepoint_group "#{element.description} in_engine=#{element.run_in_engine}" do
            element.callback.call(state)
        end
        state = new_state if propagate_state
    end
    state
end
run_pipeline(*state) click to toggle source

@api private

Internal implementation of the pipeline. This holds a thread until it is finished - there's no point in giving the thread back between the steps in the pipeline, given how the promises are used in Roby (to avoid freezing due to blocking calls)

# File lib/roby/promise.rb, line 75
def run_pipeline(*state)
    Thread.current.name = "run_promises"

    execution_engine.log_timepoint_group "#{description}" do
        begin
            run_pipeline_elements(self.pipeline, state)
        rescue Exception => exception
            run_pipeline_elements(self.error_pipeline, exception, propagate_state: false)
            raise Failure.new(exception)
        end
    end
ensure
    @current_element.set(nil)
end
run_pipeline_elements(pipeline, state, propagate_state: true) click to toggle source

@api private

Run one of {#pipeline} or {#error_pipeline}

# File lib/roby/promise.rb, line 93
def run_pipeline_elements(pipeline, state, propagate_state: true)
    pipeline = pipeline.dup
    while !pipeline.empty?
        state = run_one_pipeline_segment(pipeline, state, false, propagate_state: propagate_state)
        if !pipeline.empty?
            state = execution_engine.log_timepoint_group "#{description}:execute_in_engine" do
                execution_engine.execute(type: :propagation) do
                    run_one_pipeline_segment(pipeline, state, true, propagate_state: propagate_state)
                end
            end
        end
    end
    state
end
state() click to toggle source

The promise's execution state

# File lib/roby/promise.rb, line 278
def state
    promise.state
end
then(description: " click to toggle source

Alias for {#on_success}, but defaulting to execution as a separate thread

# File lib/roby/promise.rb, line 215
def then(description: "#{self.description}.then[#{pipeline.size}]", &block)
    on_success(description: description, in_engine: false, &block)
end
to_s() click to toggle source
# File lib/roby/promise.rb, line 142
def to_s
    "#<Roby::Promise #{description}>"
end
unscheduled?() click to toggle source
# File lib/roby/promise.rb, line 228
def unscheduled?
    promise.unscheduled?
end
value(timeout = nil) click to toggle source
# File lib/roby/promise.rb, line 252
def value(timeout = nil)
    if promise.complete?
        promise.value(timeout)
    else
        raise NotComplete, "cannot call #value on a non-complete promise"
    end
end
value!(timeout = nil) click to toggle source
# File lib/roby/promise.rb, line 260
def value!(timeout = nil)
    if promise.complete?
        promise.value!(timeout)
    else
        raise NotComplete, "cannot call #value on a non-complete promise"
    end
rescue Failure => e
    raise e.actual_exception
end
wait() click to toggle source
# File lib/roby/promise.rb, line 248
def wait
    promise.wait
end