class Reacto::Subscriptions::ZippingSubscription

Public Instance Methods

current_value() click to toggle source
# File lib/reacto/subscriptions/zipping_subscription.rb, line 8
def current_value
  @current_value ||= 0
end
on_close() click to toggle source
# File lib/reacto/subscriptions/zipping_subscription.rb, line 29
def on_close
  return unless subscribed?
  @subscriber.on_close
end
on_value_subscriptions(_) click to toggle source
# File lib/reacto/subscriptions/zipping_subscription.rb, line 20
def on_value_subscriptions(_)
  @subscriber.on_value(
    @combinator.call(
      *@subscriptions.map { |sub| sub.buffer[current_value] }
    )
  )
  @current_value += 1
end
subscribed?() click to toggle source
# File lib/reacto/subscriptions/zipping_subscription.rb, line 12
def subscribed?
  @subscriptions.all? { |s| s.subscribed? }
end
subscription!() click to toggle source
# File lib/reacto/subscriptions/zipping_subscription.rb, line 34
def subscription!
  subscription = BufferedSubscription.new(self)
  @subscriptions << subscription

  subscription
end
waiting?() click to toggle source
# File lib/reacto/subscriptions/zipping_subscription.rb, line 16
def waiting?
  @subscriptions.any? { |sub| sub.buffer[current_value] == NO_VALUE }
end