class Rx::Plan
Public Class Methods
new(expression, selector)
click to toggle source
# File lib/rx/joins/plan.rb, line 3 def initialize(expression, selector) @expression = expression @selector = selector end
Public Instance Methods
activate(external_subscriptions, observer, deactivate)
click to toggle source
# File lib/rx/joins/plan.rb, line 8 def activate(external_subscriptions, observer, deactivate) join_observers = [] @expression.patterns.each {|pat| join_observers.push plan_create_observer(external_subscriptions, pat, observer.method(:on_error)) } active_plan = ActivePlan.new(join_observers, lambda {|*args| begin result = @selector.call(*args) rescue => e observer.on_error e end observer.on_next result }, lambda { join_observers.each {|v| v.remove_active_plan(active_plan) } deactivate.call(active_plan) }) join_observers.each {|v| v.add_active_plan(active_plan) } return active_plan end
plan_create_observer(external_subscriptions, observable, on_error)
click to toggle source
# File lib/rx/joins/plan.rb, line 34 def plan_create_observer(external_subscriptions, observable, on_error) entry = external_subscriptions[observable] if !entry observer = JoinObserver.new(observable, on_error) external_subscriptions[observable] = observer return observer end entry end