class Rx::VirtualTimeScheduler

Base class for virtual time schedulers using a priority queue for scheduled items.

Attributes

clock[R]

Public Class Methods

new(initial_clock) click to toggle source
# File lib/rx/concurrency/virtual_time_scheduler.rb, line 16
def initialize(initial_clock)
  @clock = initial_clock.to_i
  @queue = PriorityQueue.new
  @enabled = false
end

Public Instance Methods

advance_by(time) click to toggle source

Advances the scheduler’s clock by the specified relative time, running all work scheduled for that timespan.

# File lib/rx/concurrency/virtual_time_scheduler.rb, line 130
def advance_by(time)
  dt = @clock + time

  due_to_clock = dt<=>clock
  raise 'Time is out of range' if due_to_clock < 0

  return if due_to_clock == 0
  raise 'Cannot advance while running' if @enabled

  self.advance_to dt
end
advance_to(time) click to toggle source

Advances the scheduler’s clock to the specified time, running all work till that point.

# File lib/rx/concurrency/virtual_time_scheduler.rb, line 102
def advance_to(time)
  due_to_clock = time<=>clock
  raise 'Time is out of range' if due_to_clock < 0 

  return if due_to_clock == 0

  unless @enabled
    @enabled = true

    begin
      next_item = self.get_next
      if !next_item.nil? && next_item.due_time <= time
        @clock = next_item.due_time if next_item.due_time > @clock
        next_item.invoke
      else
        @enabled = false
      end

    end while @enabled

    @clock = time
  else
    raise 'Cannot advance while running'
  end

end
enabled?() click to toggle source

Gets whether the scheduler is enabled to run work.

# File lib/rx/concurrency/virtual_time_scheduler.rb, line 28
def enabled?
  @enabled
end
get_next() click to toggle source

Gets the next scheduled item to be executed

# File lib/rx/concurrency/virtual_time_scheduler.rb, line 153
def get_next
  while next_item = @queue.peek
    if next_item.cancelled?
      @queue.shift
    else
      return next_item
    end
  end

  return nil
end
invoke(scheduler, action) click to toggle source
# File lib/rx/concurrency/virtual_time_scheduler.rb, line 165
def invoke(scheduler, action)
  action.call
  Subscription.empty
end
now() click to toggle source

Gets the scheduler’s notion of current time.

# File lib/rx/concurrency/virtual_time_scheduler.rb, line 23
def now
  clock
end
schedule_absolute_with_state(state, due_time, action)
schedule_at_absolute(due_time, action) click to toggle source

Schedules an action to be executed at due_time.

# File lib/rx/concurrency/virtual_time_scheduler.rb, line 78
def schedule_at_absolute(due_time, action)
  raise 'action cannot be nil' unless action

  schedule_at_absolute_with_state(action, due_time, method(:invoke))      
end
schedule_at_absolute_with_state(state, due_time, action) click to toggle source

Schedules an action to be executed at due_time.

# File lib/rx/concurrency/virtual_time_scheduler.rb, line 85
def schedule_at_absolute_with_state(state, due_time, action)
  raise 'action cannot be nil' unless action

  si = nil
  run = lambda {|scheduler, state1|
    @queue.delete si
    action.call(scheduler, state1)
  }

  si = ScheduledItem.new(self, state, due_time, &run)
  @queue.push si

  Subscription.create { si.cancel }
end
Also aliased as: schedule_absolute_with_state
schedule_at_relative(due_time, action) click to toggle source

Schedules an action to be executed at due_time.

# File lib/rx/concurrency/virtual_time_scheduler.rb, line 63
def schedule_at_relative(due_time, action)
  raise 'action cannot be nil' unless action

  schedule_at_relative_with_state(action, due_time, method(:invoke))
end
schedule_at_relative_with_state(state, due_time, action) click to toggle source

Schedules an action to be executed at due_time.

# File lib/rx/concurrency/virtual_time_scheduler.rb, line 70
def schedule_at_relative_with_state(state, due_time, action)
  raise 'action cannot be nil' unless action

  schedule_at_absolute_with_state(state, @clock + due_time, action)
end
Also aliased as: schedule_relative_with_state
schedule_relative_with_state(state, due_time, action)
schedule_with_state(state, action) click to toggle source

Schedules an action to be executed.

# File lib/rx/concurrency/virtual_time_scheduler.rb, line 57
def schedule_with_state(state, action)
  raise 'action cannot be nil' unless action
  schedule_at_absolute_with_state(state, clock, action)
end
sleep(time) click to toggle source

Advances the scheduler’s clock by the specified relative time.

# File lib/rx/concurrency/virtual_time_scheduler.rb, line 143
def sleep(time)
  dt = @clock + time

  due_to_clock = dt<=>@clock
  raise 'Time is out of range' if due_to_clock < 0

  @clock = dt
end
start() click to toggle source

Starts the virtual time scheduler.

# File lib/rx/concurrency/virtual_time_scheduler.rb, line 33
def start
  unless @enabled
    @enabled = true

    begin
      next_item = self.get_next

      unless next_item.nil?
        @clock = next_item.due_time if next_item.due_time > @clock
        next_item.invoke
      else
        @enabled = false
      end

    end while @enabled
  end
end
stop() click to toggle source

Stops the virtual time scheduler.

# File lib/rx/concurrency/virtual_time_scheduler.rb, line 52
def stop
  @enabled = false
end