class Roby::DRoby::PlanRebuilder

This class rebuilds a plan structure from events saved by {EventLogger}

The data has to be fed cycle-by-cycle to the {#process_cycle} method

Attributes

changes[R]

A hash that stores (at a high level) what changed since the last call to clear_integrated

Don't manipulate directly, but use the announce_* and has_*_changes? methods

current_time[R]

The time of the last processed log item

event_filters[R]

A set of EventFilter objects that list the labelling objects / filters applied on the event stream

marshal[R]

The object that unmarshals the data

object_manager[R]

The object that does ID-to-object mapping

plan[R]

The Plan object into which we rebuild information

scheduler_state[R]

The scheduler state for the current cycle

start_time[R]

The time of the first processed cycle

state[R]

A representation of the state for this execution cycle

stats[R]

A hash representing the statistics for this execution cycle

Public Class Methods

new(plan: RebuiltPlan.new) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 40
def initialize(plan: RebuiltPlan.new)
    @plan = plan
    @object_manager = ObjectManager.new(DRobyID.allocate)
    @marshal = Marshal.new(object_manager, nil)

    @scheduler_state = Schedulers::State.new
    clear_changes
    @stats = Hash.new
end
update_type(type) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 159
def self.update_type(type)
    define_method("announce_#{type}_update") do
        @changes[type] = true
    end
    define_method("has_#{type}_updates?") do
        !!@changes[type]
    end
end

Public Instance Methods

added_edge(time, parent, child, relations, info) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 197
def added_edge(time, parent, child, relations, info)
    parent = local_object(parent)
    child  = local_object(child)
    rel    = local_object(relations.first)
    info   = local_object(info)
    g = parent.relation_graph_for(rel)
    g.add_edge(parent, child, info)
end
analyze_stream(event_stream, until_cycle = nil) { || ... } click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 50
def analyze_stream(event_stream, until_cycle = nil)
    while !event_stream.eof? && (!until_cycle || (cycle_index && cycle_index == until_cycle))
        begin
            data = event_stream.read
            interesting = process(data)
            if block_given?
                interesting = yield
            end

            if interesting
                relations = if !has_structure_updates? && !history.empty?
                                history.last.relations
                            end

                history << snapshot(relations)
            end

        ensure
            clear_integrated
        end
    end
end
clear() click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 98
def clear
    plan.clear
    object_manager.clear
    @scheduler_state = Schedulers::State.new
end
clear_changes() click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 152
def clear_changes
    @changes = Hash[
        state: false,
        structure: false,
        event_propagation: false]
end
clear_integrated() click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 143
def clear_integrated
    clear_changes
    if !plan.garbaged_tasks.empty? || !plan.garbaged_events.empty?
        announce_structure_update
        announce_state_update
    end
    plan.clear_integrated
end
cycle_end(time, timings) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 345
def cycle_end(time, timings)
    plan.scheduler_states << scheduler_state
    @scheduler_state = Schedulers::State.new
    @state = timings.delete(:state)
    @stats = timings
    @start_time ||= self.cycle_start_time
    announce_state_update
end
cycle_end_time() click to toggle source

The starting time of the last processed cycle

# File lib/roby/droby/plan_rebuilder.rb, line 83
def cycle_end_time
    Time.at(*stats[:start]) + stats[:end]
end
cycle_index() click to toggle source

The cycle index of the last processed cycle

# File lib/roby/droby/plan_rebuilder.rb, line 88
def cycle_index
    stats[:cycle_index]
end
cycle_start_time() click to toggle source

The starting time of the last processed cycle

@return [Time]

# File lib/roby/droby/plan_rebuilder.rb, line 76
def cycle_start_time
    if stats[:start] && stats[:actual_start]
        Time.at(*stats[:start]) + stats[:actual_start]
    end
end
event_status_change(time, event, status) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 235
def event_status_change(time, event, status)
    event = local_object(event)
    if status == :normal
        plan.unmark_permanent_event(event)
    elsif status == :permanent
        plan.add_permanent_event(event)
    end
end
exception_notification(time, plan_id, mode, error, involved_objects) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 325
def exception_notification(time, plan_id, mode, error, involved_objects)
    local_object(plan_id).propagated_exceptions << [mode, local_object(error), local_object(involved_objects)]
end
finalized_event(time, plan_id, event) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 256
def finalized_event(time, plan_id, event)
    plan  = local_object(plan_id)
    event = local_object(event)
    event.finalization_time = time
    if !plan.garbaged_events.include?(event) && event.root_object?
        plan.finalized_events << event
        plan.remove_free_event(event)
        announce_structure_update
    end
    object_manager.deregister_object(event)
end
finalized_task(time, plan_id, task) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 267
def finalized_task(time, plan_id, task)
    plan = local_object(plan_id)
    task = local_object(task)
    task.finalization_time = time
    if !plan.garbaged_tasks.include?(task)
        plan.finalized_tasks << task
        plan.remove_task(task)
        announce_structure_update
    end
    object_manager.deregister_object(task)
end
garbage_event(time, plan, object) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 250
def garbage_event(time, plan, object)
    plan = local_object(plan)
    object = local_object(object)
    plan.garbaged_events << object
end
garbage_task(time, plan, object, can_finalize) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 244
def garbage_task(time, plan, object, can_finalize)
    plan = local_object(plan)
    object = local_object(object)
    plan.garbaged_tasks << object
end
generator_emit_failed(time, generator, error) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 307
def generator_emit_failed(time, generator, error)
    generator = local_object(generator)
    error = local_object(error)
    generator.plan.failed_emissions << [generator, error]
    announce_event_propagation_update
end
generator_fired(time, event) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 293
def generator_fired(time, event)
    event     = local_object(event)
    generator = event.generator
    plan      = event.plan

    generator.history << event
    generator.instance_eval { @emitted = true }
    if generator.respond_to?(:task)
        generator.task.fired_event(event)
    end
    generator.plan.emitted_events << event
    announce_event_propagation_update
end
generator_propagate_events(time, is_forwarding, events, generator) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 314
def generator_propagate_events(time, is_forwarding, events, generator)
    events    = local_object(events)
    generator = local_object(generator)
    generator.plan.propagated_events << [is_forwarding, events, generator]
    announce_event_propagation_update
end
generator_unreachable(time, generator, reason) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 321
def generator_unreachable(time, generator, reason)
    local_object(generator).mark_unreachable!(local_object(reason))
end
has_interesting_events?() click to toggle source

True if there are stuff recorded in the last played cycles that demand a snapshot to be created

# File lib/roby/droby/plan_rebuilder.rb, line 94
def has_interesting_events?
    has_structure_updates? || has_event_propagation_updates?
end
local_object(object) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 139
def local_object(object)
    marshal.local_object(object)
end
merged_plan(time, plan_id, merged_plan) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 185
def merged_plan(time, plan_id, merged_plan)
    merged_plan = local_object(merged_plan)
    tasks_and_events = merged_plan.tasks.to_a +
        merged_plan.free_events.to_a +
        merged_plan.task_events.to_a

    local_object(plan).merge(merged_plan)
    tasks_and_events.each do |obj|
        obj.addition_time = time
    end
end
pretty_print(pp) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 363
def pretty_print(pp)
    pp.text "Plan rebuilder state for #{plan}"
    pp.nest(2) do
        pp.breakable
        object_manager.pretty_print(pp)
    end
end
process_one_cycle(data) click to toggle source

Processes one cycle worth of data coming from an EventStream, updating the plan

It returns true if there was something noteworthy in there, and false otherwise.

# File lib/roby/droby/plan_rebuilder.rb, line 109
def process_one_cycle(data)
    data.each_slice(4) do |m, sec, usec, args|
        process_one_event(m, sec, usec, args)
    end
end
process_one_event(m, sec, usec, args) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 115
def process_one_event(m, sec, usec, args)
    time = Time.at(sec, usec)
    @current_time = time

    begin
        if respond_to?(m)
            send(m, time, *args)
        end
    rescue Interrupt
        raise
    rescue Exception => e
        display_args = args.map do |obj|
            case obj
            when NilClass then 'nil'
            when Time then obj.to_hms
            else (obj.to_s rescue "failed_to_s")
            end
        end

        raise e, "#{e.message} while serving #{m}(#{display_args.join(", ")})", e.backtrace
    end
    nil
end
register_executable_plan(time, plan_id) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 180
def register_executable_plan(time, plan_id)
    @plan = RebuiltPlan.new
    object_manager.register_object(plan, nil => plan_id)
end
removed_edge(time, parent, child, relations) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 215
def removed_edge(time, parent, child, relations)
    parent = local_object(parent)
    child  = local_object(child)
    rel    = local_object(relations.first)
    g = parent.relation_graph_for(rel)
    g.remove_edge(parent, child)
end
scheduler_report_action(time, msg, task, *args) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 341
def scheduler_report_action(time, msg, task, *args)
    scheduler_state.report_action(msg, local_object(task), *local_object(args))
end
scheduler_report_holdoff(time, msg, task, *args) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 337
def scheduler_report_holdoff(time, msg, task, *args)
    scheduler_state.report_holdoff(msg, local_object(task), *local_object(args))
end
scheduler_report_pending_non_executable_task(time, msg, task, *args) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 329
def scheduler_report_pending_non_executable_task(time, msg, task, *args)
    scheduler_state.report_pending_non_executable_task(msg, local_object(task), *local_object(args))
end
scheduler_report_trigger(time, generator) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 333
def scheduler_report_trigger(time, generator)
    scheduler_state.report_trigger(local_object(generator))
end
task_arguments_updated(time, task, key, value) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 279
def task_arguments_updated(time, task, key, value)
    task  = local_object(task)
    value = local_object(value)
    task.arguments.force_merge!(key => value)
end
task_failed_to_start(time, task, reason) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 285
def task_failed_to_start(time, task, reason)
    task   = local_object(task)
    reason = local_object(reason)
    task.plan.failed_to_start << [task, reason]
    task.mark_failed_to_start(reason, time)
    announce_event_propagation_update
end
task_status_change(time, task, status) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 223
def task_status_change(time, task, status)
    task = local_object(task)
    if status == :normal
        plan.unmark_mission_task(task)
        plan.unmark_permanent_task(task)
    elsif status == :permanent
        plan.add_permanent_task(task)
    elsif status == :mission
        plan.add_mission_task(task)
    end
end
timepoint(time, *name) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 360
def timepoint(time, *name)
end
timepoint_group_end(time, *name) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 357
def timepoint_group_end(time, *name)
end
timepoint_group_start(time, *name) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 354
def timepoint_group_start(time, *name)
end
updated_edge_info(time, parent, child, relation, info) click to toggle source
# File lib/roby/droby/plan_rebuilder.rb, line 206
def updated_edge_info(time, parent, child, relation, info)
    parent = local_object(parent)
    child  = local_object(child)
    rel    = local_object(relation)
    info   = local_object(info)
    g = parent.relation_graph_for(rel)
    g.set_edge_info(parent, child, info)
end