class Rx::CheckedObserver

Public Class Methods

new(observer) click to toggle source
# File lib/rx/core/checked_observer.rb, line 19
def initialize(observer)
  @observer = observer
  @state = :idle
end

Public Instance Methods

on_completed() click to toggle source
# File lib/rx/core/checked_observer.rb, line 42
def on_completed
  check_access
  begin
    @observer.on_completed
  ensure
    Mutex.new.synchronize { @state = :done }
  end
end
on_error(error) click to toggle source
# File lib/rx/core/checked_observer.rb, line 33
def on_error(error)
  check_access
  begin
    @observer.on_error error
  ensure
    Mutex.new.synchronize { @state = :done }
  end
end
on_next(value) click to toggle source
# File lib/rx/core/checked_observer.rb, line 24
def on_next(value)
  check_access
  begin
    @observer.on_next value
  ensure
    Mutex.new.synchronize { @state = :idle }
  end
end

Private Instance Methods

check_access() click to toggle source
# File lib/rx/core/checked_observer.rb, line 53
def check_access
  Mutex.new.synchronize do 
    old = @state
    @state = :busy if @state == :idle
    case old
    when :busy
      raise 'Re-entrancy detected'
    when :done
      raise 'Observer terminated'
    end
  end
end