class Roby::Interface::Async::Interface

An event-based client to for the Roby interface

This class provides an asynchronous (event-based) API to the Roby interface, it allows for job and task discovery and tracking, and is robust to disconnections and reconnections.

It is meant for the implementations of GUIs that interface with a Roby system. The {#poll} method must be called regularly from a main event loop (e.g. from a Qt timer)

Notification callbacks can be registered with one of the on_ methods (on_job, …). A Qt-oriented declarative approach to binding jobs to a UI can be found in {UIConnector}

Constants

DEFAULT_REMOTE_NAME

Attributes

client[R]

@return [Client,nil] the socket used to communicate to the server,

or nil if we have not managed to connect yet
connection_future[R]

The future used to connect to the remote process without blocking the main event loop

connection_method[R]

@return [#call] an object that can create a Client instance

job_monitors[R]

The set of JobMonitor objects currently registered on self

@return [Hash<Integer,Set<JobMonitor>>]

new_job_listeners[R]

The set of NewJobListener objects currently registered on self

@return [Array<NewJobListener>]

remote_name[R]

@return [String] a string that describes the remote host

Public Class Methods

new(remote_name = DEFAULT_REMOTE_NAME, port: Roby::Interface::DEFAULT_PORT, connect: true, &connection_method) click to toggle source
# File lib/roby/interface/async/interface.rb, line 128
def initialize(remote_name = DEFAULT_REMOTE_NAME, port: Roby::Interface::DEFAULT_PORT, connect: true, &connection_method)
    @connection_method = connection_method || lambda {
        Roby::Interface.connect_with_tcp_to(remote_name, port)
    }

    @remote_name = remote_name
    @first_connection_attempt = true
    if connect
        attempt_connection
    end

    @job_monitors = Hash.new
    @new_job_listeners = Array.new
end

Public Instance Methods

actions() click to toggle source

The set of known actions

This is available only after we got a successful connection to the remote side

# File lib/roby/interface/async/interface.rb, line 171
def actions
    client.actions
end
active_job_monitor?(job) click to toggle source
# File lib/roby/interface/async/interface.rb, line 485
def active_job_monitor?(job)
    if set = job_monitors[job.job_id]
        set.include?(job)
    end
end
add_job_monitor(job) click to toggle source
# File lib/roby/interface/async/interface.rb, line 462
def add_job_monitor(job)
    set = (job_monitors[job.job_id] ||= Set.new)
    job_monitors[job.job_id] << job
end
add_new_job_listener(job) click to toggle source
# File lib/roby/interface/async/interface.rb, line 454
def add_new_job_listener(job)
    new_job_listeners << job
end
async_call(path, m, *args, &block) click to toggle source

Schedules an async call on the client

@see Client#async_call

# File lib/roby/interface/async/interface.rb, line 146
def async_call(path, m, *args, &block)
    raise 'client not connected' unless connected?
    client.async_call(path, m, *args, &block)
end
async_call_pending?(call) click to toggle source

Checks whether an async call is still pending

@see Client#async_call_pending?

# File lib/roby/interface/async/interface.rb, line 154
def async_call_pending?(call)
    connected? && client.async_call_pending?(call)
end
attempt_connection() click to toggle source

Start a connection attempt

# File lib/roby/interface/async/interface.rb, line 159
def attempt_connection
    @connection_future = Concurrent::Future.new do
        client = connection_method.call
        [client, client.jobs]
    end
    connection_future.execute
end
cleanup_dead_monitors() click to toggle source
# File lib/roby/interface/async/interface.rb, line 467
def cleanup_dead_monitors
    job_monitors.delete_if do |job_id, monitors|
        monitors.delete_if do |job|
            job.finalized?
        end
        monitors.empty?
    end
end
close(reconnect: false) click to toggle source

Close the connection to the Roby interface

@param [Boolean] reconnect if true, attempt to reconnect right

away. If false, the caller will be responsible to call
{#attempt_connection} before any future call to {#poll}
# File lib/roby/interface/async/interface.rb, line 373
def close(reconnect: false)
    unreachable!
    if reconnect
        attempt_connection
    end
end
connect_to_ui(widget, &block) click to toggle source
# File lib/roby/interface/async/interface.rb, line 491
def connect_to_ui(widget, &block)
    UIConnector.new(self, widget).instance_eval(&block)
end
connected?() click to toggle source
# File lib/roby/interface/async/interface.rb, line 280
def connected?
    !!client
end
connecting?() click to toggle source
# File lib/roby/interface/async/interface.rb, line 276
def connecting?
    connection_future
end
create_batch() click to toggle source
# File lib/roby/interface/async/interface.rb, line 450
def create_batch
    client.create_batch
end
cycle_index() click to toggle source
# File lib/roby/interface/async/interface.rb, line 288
def cycle_index
    client.cycle_index
end
cycle_start_time() click to toggle source
# File lib/roby/interface/async/interface.rb, line 284
def cycle_start_time
    client.cycle_start_time
end
find_all_jobs(action_name, jobs: self.jobs) click to toggle source

Find the jobs that have been created from a given action

The returned monitors are not started, you have to call {JobMonitor#start} explicitely on them before you use them

@param [String] action_name the action name @return [Array<JobMonitor>] the matching jobs

# File lib/roby/interface/async/interface.rb, line 416
def find_all_jobs(action_name, jobs: self.jobs)
    jobs.find_all do |job|
        job.task.action_model.name == action_name
    end
end
jobs() click to toggle source

Returns all the existing jobs on this interface

The returned monitors are not started, you have to call {JobMonitor#start} explicitely on them before you use them

@return [Array<JobMonitor>]

# File lib/roby/interface/async/interface.rb, line 401
def jobs
    return Array.new if !reachable?

    client.jobs.map do |job_id, (job_state, placeholder_task, job_task)|
        JobMonitor.new(self, job_id, placeholder_task: placeholder_task, task: job_task, state: job_state)
    end
end
monitor_job(job_id, start: true) click to toggle source

Create a monitor on a job based on its ID

The monitor is already started

# File lib/roby/interface/async/interface.rb, line 425
def monitor_job(job_id, start: true)
    job_state, placeholder_task, job_task = client.find_job_info_by_id(job_id)
    job = JobMonitor.new(self, job_id, state: job_state, placeholder_task: placeholder_task, task: job_task)
    if start
        job.start
    end
    job
end
on_job(action_name: nil, jobs: self.jobs, &block) click to toggle source

Registers a callback that should be called for each new job

The callback gets called, on registration, with all the existing jobs. It is then called with new jobs as they get created.

@param [String,nil] action_name limit notifications to actions

with this name. No filtering is performed if nil.

@yieldparam [JobMonitor] job_monitor a monitor for a job that is just

created. It is not monitoring the job yet, call
{JobMonitor#start} to get it to start monitoring.

@return [NewJobListener]

# File lib/roby/interface/async/interface.rb, line 105
def on_job(action_name: nil, jobs: self.jobs, &block)
    listener = NewJobListener.new(self, action_name, block)
    listener.start
    if reachable?
        run_initial_new_job_hooks_events(listener, jobs)
    end
    listener
end
poll() click to toggle source

Active part of the async. This has to be called regularly within the system's main event loop (e.g. Roby's, Vizkit's or Qt's)

@return [Boolean] true if we are connected to the remote server

and false otherwise
# File lib/roby/interface/async/interface.rb, line 343
def poll
    if connected?
        poll_messages
        true
    elsif connecting?
        poll_connection_attempt
        !!client
    end
end
poll_connection_attempt() click to toggle source

Verify the state of the last connection attempt

It checks on the last connection attempt, and sets {#client} if it was successful, as well as call the callbacks registered with {#on_reachable}

# File lib/roby/interface/async/interface.rb, line 180
def poll_connection_attempt
    return if client
    return if !connection_future.complete?

    case e = connection_future.reason
    when ConnectionError, ComError, ProtocolError
        Interface.info "failed connection attempt: #{e}"
        attempt_connection
        if @first_connection_attempt
            @first_connection_attempt = false
            run_hook :on_unreachable
        end
        nil
    when NilClass
        Interface.info "successfully connected"
        @client, jobs = connection_future.value
        @client.io.reset_thread_guard
        @connection_future = nil
        jobs = jobs.map do |job_id, (job_state, placeholder_task, job_task)|
            JobMonitor.new(self, job_id, state: job_state, placeholder_task: placeholder_task, task: job_task)
        end
        run_hook :on_reachable, jobs
        new_job_listeners.each do |listener|
            listener.reset
            run_initial_new_job_hooks_events(listener, jobs)
        end
    else
        future, @connection_future = @connection_future, nil
        raise future.reason
    end
end
poll_messages() click to toggle source
# File lib/roby/interface/async/interface.rb, line 292
def poll_messages
    has_cycle_end = true
    while has_cycle_end
        cleanup_dead_monitors
        _, has_cycle_end = client.poll
        process_message_queues
    end
rescue ComError
    Interface.info "link closed, trying to reconnect"
    unreachable!
    attempt_connection
    false
rescue Exception => e
    Interface.warn "error while polling connection, trying to reconnect"
    Roby.log_exception_with_backtrace(e, Interface, :warn)
    unreachable!
    attempt_connection
    false
end
process_message_queues() click to toggle source

@private

Process the message queues from {#client}

# File lib/roby/interface/async/interface.rb, line 215
def process_message_queues
    client.notification_queue.each do |id, level, message|
        run_hook :on_notification, level, message
    end
    client.notification_queue.clear
    client.ui_event_queue.each do |id, event_name, *args|
        run_hook :on_ui_event, event_name, *args
    end
    client.ui_event_queue.clear

    client.job_progress_queue.each do |id, (job_state, job_id, job_name, *args)|
        new_job_listeners.each do |listener|
            next if listener.seen_job_with_id?(job_id)

            job =
                if job_state == JOB_MONITORED
                    JobMonitor.new(
                        self, job_id,
                        state: job_state,
                        placeholder_task: args[0],
                        task: args[1])
                else
                    monitor_job(job_id, start: false)
                end
            if listener.matches?(job)
                listener.call(job)
            else
                listener.ignored(job)
            end
        end

        if monitors = job_monitors[job_id]
            monitors.delete_if do |m|
                m.update_state(job_state)
                if job_state == JOB_REPLACED
                    m.replaced(args.first)
                end
                m.finalized?
            end
            if monitors.empty?
                job_monitors.delete(job_id)
            end
        end
        run_hook :on_job_progress, job_state, job_id, job_name, args
    end
    client.job_progress_queue.clear

    client.exception_queue.each do |id, (kind, exception, tasks, job_ids)|
        job_ids.each do |job_id|
            if monitors = job_monitors[job_id]
                monitors.dup.each do |m|
                    m.notify_exception(kind, exception)
                end
            end
        end

        run_hook :on_exception, kind, exception, tasks, job_ids
    end
    client.exception_queue.clear
end
quit() click to toggle source

Asks the remote app to quit (synchronous)

# File lib/roby/interface/async/interface.rb, line 386
def quit
    client.quit
end
reachable?() click to toggle source

True if we are connected to a client

# File lib/roby/interface/async/interface.rb, line 381
def reachable?
    !!client
end
remove_job_monitor(job) click to toggle source
# File lib/roby/interface/async/interface.rb, line 476
def remove_job_monitor(job)
    if set = job_monitors[job.job_id]
        set.delete(job)
        if set.empty?
            job_monitors.delete(job.job_id)
        end
    end
end
remove_new_job_listener(job) click to toggle source
# File lib/roby/interface/async/interface.rb, line 458
def remove_new_job_listener(job)
    new_job_listeners.delete(job)
end
restart() click to toggle source

Asks the remote app to restart (synchronous)

# File lib/roby/interface/async/interface.rb, line 391
def restart
    client.restart
end
run_initial_new_job_hooks_events(listener, jobs = self.jobs) click to toggle source

@api private

Helper to call a job listener for all matching jobs in a job set. This is called when the new job listener is created and when we get connected to a roby interface

@param [NewJobListener] listener @param [Array<JobMonitor>] jobs

# File lib/roby/interface/async/interface.rb, line 442
def run_initial_new_job_hooks_events(listener, jobs = self.jobs)
    jobs.each do |job|
        if listener.matches?(job)
            listener.call(job)
        end
    end
end
unreachable!() click to toggle source
# File lib/roby/interface/async/interface.rb, line 353
def unreachable!
    job_monitors.each_value do |monitors|
        monitors.each do |j|
            j.update_state(:unreachable)
        end
    end
    job_monitors.clear

    if client
        client.close if !client.closed?
        @client = nil
        run_hook :on_unreachable
    end
end
wait(timeout: nil) click to toggle source

Blocking call that waits until calling poll would do something

@param [Numeric,nil] timeout a timeout after which the method

will return. Use nil for no timeout

@return [Boolean] falsy if the timeout was reached, true

otherwise
# File lib/roby/interface/async/interface.rb, line 318
def wait(timeout: nil)
    if connected?
        client.wait(timeout: timeout)
    else
        wait_connection_attempt_result(timeout: timeout)
    end
end
wait_connection_attempt_result(timeout: nil) click to toggle source

Wait for the current connection attempt to finish

@param [Numeric,nil] timeout a timeout after which the method

will return. Use nil for no timeout

@return [Boolean] falsy if the timeout was reached, true

otherwise
# File lib/roby/interface/async/interface.rb, line 332
def wait_connection_attempt_result(timeout: nil)
    connection_future.wait(timeout)
    connection_future.complete?
end