class Rx::ScheduledObserver

Public Class Methods

new(scheduler, observer) click to toggle source
Calls superclass method Rx::ObserverBase::new
# File lib/rx/core/scheduled_observer.rb, line 11
def initialize(scheduler, observer)
  @scheduler = scheduler
  @observer = observer
  @gate = Monitor.new
  @queue = []
  @subscriber = SerialSubscription.new
  @acquired = false
  @faulted = false

  config = ObserverConfiguration.new
  config.on_next(&method(:on_next_core))
  config.on_error(&method(:on_error_core))
  config.on_completed(&method(:on_completed_core))

  super(config)      
end

Public Instance Methods

ensure_active(n=0) click to toggle source
# File lib/rx/core/scheduled_observer.rb, line 40
def ensure_active(n=0)
  owner = false

  @gate.synchronize do
    if !@faulted && @queue.length > 0
      owner = !@acquired
      @acquired = true
    end
  end

  @subscriber.subscription = @scheduler.schedule_recursive_with_state(nil, method(:run)) if owner
end
on_completed_core() click to toggle source
# File lib/rx/core/scheduled_observer.rb, line 36
def on_completed_core
   @gate.synchronize { @queue.push(lambda { @observer.on_completed }) }
end
on_error_core(error) click to toggle source
# File lib/rx/core/scheduled_observer.rb, line 32
def on_error_core(error)
   @gate.synchronize { @queue.push(lambda { @observer.on_error error }) }
end
on_next_core(value) click to toggle source
# File lib/rx/core/scheduled_observer.rb, line 28
def on_next_core(value)
  @gate.synchronize { @queue.push(lambda { @observer.on_next value }) }
end
run(state, recurse) click to toggle source
# File lib/rx/core/scheduled_observer.rb, line 53
def run(state, recurse)
  work = nil
  @gate.synchronize do
    if @queue.length > 0
      work = @queue.shift
    else
      @acquired = false
      return
    end
  end

  begin
    work.call
  rescue => e
    @queue = []
    @faulted = true

    raise e
  end

  recurse.call state
end
unsubscribe() click to toggle source
Calls superclass method Rx::ObserverBase#unsubscribe
# File lib/rx/core/scheduled_observer.rb, line 76
def unsubscribe
  super
  @subscriber.unsubscribe
end