class Rx::JoinObserver
Attributes
queue[R]
Public Class Methods
new(source, on_error)
click to toggle source
Calls superclass method
Rx::ObserverBase::new
# File lib/rx/joins/join_observer.rb, line 5 def initialize(source, on_error) super Observer.configure {|o| o.on_next {|notification| if !@is_disposed if notification.on_error? @on_error.call(notification.exception) next end @queue.push notification @active_plans.dup.each {|v| v.match } end } } @source = source @on_error = on_error @queue = [] @active_plans = [] @subscription = SingleAssignmentSubscription.new @is_disposed = false end
Public Instance Methods
add_active_plan(active_plan)
click to toggle source
# File lib/rx/joins/join_observer.rb, line 28 def add_active_plan(active_plan) @active_plans.push active_plan end
remove_active_plan(active_plan)
click to toggle source
# File lib/rx/joins/join_observer.rb, line 36 def remove_active_plan(active_plan) if idx = @active_plans.index(active_plan) @active_plans.delete_at idx end self.unsubscribe if @active_plans.length == 0 end
subscribe()
click to toggle source
# File lib/rx/joins/join_observer.rb, line 32 def subscribe @subscription.subscription = @source.materialize.subscribe(@config) end
unsubscribe()
click to toggle source
Calls superclass method
Rx::ObserverBase#unsubscribe
# File lib/rx/joins/join_observer.rb, line 43 def unsubscribe super if !@is_disposed @is_disposed = true @subscription.unsubscribe end end