class Rx::Subject

Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers.

Provides a set of static methods for creating subjects.

Public Class Methods

create(observer, observable) click to toggle source

Creates a subject from the specified observer and observable.

# File lib/rx/subjects/subject_extensions.rb, line 12
def self.create(observer, observable)
  AnonymousSubject.new(observer, observable)
end
new() click to toggle source
# File lib/rx/subjects/subject.rb, line 17
def initialize
  @observers = []
  @gate = Mutex.new
  @disposed = false
  @stopped = false
  @error = nil
end

Public Instance Methods

has_observers?() click to toggle source

Indicates whether the subject has observers subscribed to it.

# File lib/rx/subjects/subject.rb, line 26
def has_observers?
  @observers && @observers.length > 0
end
on_completed() click to toggle source

Notifies all subscribed observers about the end of the sequence.

# File lib/rx/subjects/subject.rb, line 31
def on_completed
  os = nil
  @gate.synchronize do 
    check_disposed

    unless @stopped
      os = @observers.clone
      @observers = []
      @stopped = true
    end 
  end

  os.each {|o| o.on_completed } if os
end
on_error(error) click to toggle source

Notifies all subscribed observers with the error.

# File lib/rx/subjects/subject.rb, line 47
def on_error(error)
  raise 'error cannot be nil' unless error

  os = nil
  @gate.synchronize do
    check_disposed

    unless @stopped
      os = @observers.clone
      @observers = []
      @stopped = true
      @error = error
    end         
  end

  os.each {|o| o.on_error error } if os
end
on_next(value) click to toggle source

Notifies all subscribed observers with the value.

# File lib/rx/subjects/subject.rb, line 66
def on_next(value) 
  os = nil
  @gate.synchronize do 
    check_disposed
    os = @observers.clone unless @stopped
  end

  os.each {|o| o.on_next value } if os      
end
subscribe(observer) click to toggle source

Subscribes an observer to the subject.

# File lib/rx/subjects/subject.rb, line 77
def subscribe(observer)
  raise 'observer cannot be nil' unless observer

  @gate.synchronize do
    check_disposed

    if !@stopped
      @observers.push(observer)
      return InnerSubscription.new(self, observer)
    elsif @error
      observer.on_error @error
      return Subscription.empty
    else
      observer.on_completed
      return Subscription.empty
    end
  end
end
unsubscribe() click to toggle source

Unsubscribe all observers and release resources.

# File lib/rx/subjects/subject.rb, line 97
def unsubscribe
  @gate.synchronize do
    @disposed = true
    @observers = nil
  end
end

Private Instance Methods

check_disposed() click to toggle source
# File lib/rx/subjects/subject.rb, line 127
def check_disposed
  raise ArgumentError.new 'Subject disposed' if @disposed
end
unsubscribe_observer(observer) click to toggle source
# File lib/rx/subjects/subject.rb, line 121
def unsubscribe_observer(observer)
  @gate.synchronize do
    @observers.delete(observer) if @observers
  end
end