class Rx::CurrentThreadScheduler

Represents an object that schedules units of work on the platform’s default scheduler.

Public Class Methods

schedule_required?() click to toggle source

Gets a value that indicates whether the caller must call a Schedule method.

# File lib/rx/concurrency/current_thread_scheduler.rb, line 20
def self.schedule_required?
  @@thread_local_queue.nil?
end

Private Class Methods

queue() click to toggle source
# File lib/rx/concurrency/current_thread_scheduler.rb, line 54
def queue
  @@thread_local_queue
end
queue=(new_queue) click to toggle source
# File lib/rx/concurrency/current_thread_scheduler.rb, line 58
def queue=(new_queue)
  @@thread_local_queue = new_queue
end
run_trampoline(queue) click to toggle source
# File lib/rx/concurrency/current_thread_scheduler.rb, line 62
def run_trampoline(queue)
  while item = queue.shift
    unless item.cancelled?
      wait = item.due_time - Scheduler.now.to_i
      sleep wait if wait > 0
      item.invoke unless item.cancelled?
    end
  end
end

Public Instance Methods

schedule_relative_with_state(state, due_time, action) click to toggle source

Schedules an action to be executed after dueTime.

# File lib/rx/concurrency/current_thread_scheduler.rb, line 25
def schedule_relative_with_state(state, due_time, action)
  raise 'action cannot be nil' unless action

  dt = self.now.to_i + Scheduler.normalize(due_time)
  si = ScheduledItem.new self, state, dt, &action

  local_queue = self.class.queue

  unless local_queue
    local_queue = PriorityQueue.new
    local_queue.push si

    self.class.queue = local_queue

    begin
      self.class.run_trampoline local_queue
    ensure
      self.class.queue = nil
    end
  else
    local_queue.push si
  end

  Subscription.create { si.cancel }
end