class Rx::Subject
Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers.
Provides a set of static methods for creating subjects.
Public Class Methods
create(observer, observable)
click to toggle source
Creates a subject from the specified observer and observable.
# File lib/rx/subjects/subject_extensions.rb, line 12 def self.create(observer, observable) AnonymousSubject.new(observer, observable) end
new()
click to toggle source
# File lib/rx/subjects/subject.rb, line 17 def initialize @observers = [] @gate = Mutex.new @disposed = false @stopped = false @error = nil end
Public Instance Methods
has_observers?()
click to toggle source
Indicates whether the subject has observers subscribed to it.
# File lib/rx/subjects/subject.rb, line 26 def has_observers? @observers && @observers.length > 0 end
on_completed()
click to toggle source
Notifies all subscribed observers about the end of the sequence.
# File lib/rx/subjects/subject.rb, line 31 def on_completed os = nil @gate.synchronize do check_disposed unless @stopped os = @observers.clone @observers = [] @stopped = true end end os.each {|o| o.on_completed } if os end
on_error(error)
click to toggle source
Notifies all subscribed observers with the error.
# File lib/rx/subjects/subject.rb, line 47 def on_error(error) raise 'error cannot be nil' unless error os = nil @gate.synchronize do check_disposed unless @stopped os = @observers.clone @observers = [] @stopped = true @error = error end end os.each {|o| o.on_error error } if os end
on_next(value)
click to toggle source
Notifies all subscribed observers with the value.
# File lib/rx/subjects/subject.rb, line 66 def on_next(value) os = nil @gate.synchronize do check_disposed os = @observers.clone unless @stopped end os.each {|o| o.on_next value } if os end
subscribe(observer)
click to toggle source
Subscribes an observer to the subject.
# File lib/rx/subjects/subject.rb, line 77 def subscribe(observer) raise 'observer cannot be nil' unless observer @gate.synchronize do check_disposed if !@stopped @observers.push(observer) return InnerSubscription.new(self, observer) elsif @error observer.on_error @error return Subscription.empty else observer.on_completed return Subscription.empty end end end
unsubscribe()
click to toggle source
Unsubscribe all observers and release resources.
# File lib/rx/subjects/subject.rb, line 97 def unsubscribe @gate.synchronize do @disposed = true @observers = nil end end
Private Instance Methods
check_disposed()
click to toggle source
# File lib/rx/subjects/subject.rb, line 127 def check_disposed raise ArgumentError.new 'Subject disposed' if @disposed end
unsubscribe_observer(observer)
click to toggle source
# File lib/rx/subjects/subject.rb, line 121 def unsubscribe_observer(observer) @gate.synchronize do @observers.delete(observer) if @observers end end