class Reacto::Subscriptions::BufferedSubscription

Attributes

buffer[RW]
last_error[RW]

Public Class Methods

new(parent) click to toggle source
Calls superclass method
# File lib/reacto/subscriptions/buffered_subscription.rb, line 10
def initialize(parent)
  @parent = parent
  @closed = false
  @active = false

  @buffer = Hash.new(NO_VALUE)
  @current_index = Concurrent::AtomicFixnum.new(0)
  @last_error = nil

  open = -> () do
    @active = true
    @parent.on_open
  end

  value = -> (v) do
    @buffer[@current_index.value] = v
    @current_index.increment

    @parent.on_value(v)
  end

  error = -> (e) do
    @last_error = e
    @parent.on_error(e)
  end

  close = -> () do
    @closed = true
    @parent.on_close
  end

  super(open: open, value: value, error: error, close: close)
end

Public Instance Methods

active?() click to toggle source
# File lib/reacto/subscriptions/buffered_subscription.rb, line 44
def active?
  @active
end
closed?() click to toggle source
# File lib/reacto/subscriptions/buffered_subscription.rb, line 48
def closed?
  @closed
end