class Reacto::Operations::BlockingEnumerable

Public Class Methods

new(method_name, block) click to toggle source
# File lib/reacto/operations/blocking_enumerable.rb, line 6
def initialize(method_name, block)
  @method_name = method_name
  @block = block
end

Public Instance Methods

call(tracker) click to toggle source
# File lib/reacto/operations/blocking_enumerable.rb, line 11
def call(tracker)
  data = []

  value = -> (val) { data << val }
  close = -> do
    emit(tracker, data)
    tracker.on_close
  end
  error = ->(e) do
    emit(tracker, data)
    tracker.on_error(e)
  end

  Subscriptions::OperationSubscription.new(
    tracker, value: value, error: error, close: close
  )
end
emit(tracker, data) click to toggle source
# File lib/reacto/operations/blocking_enumerable.rb, line 29
def emit(tracker, data)
  result = data.send(@method_name, &@block)

  if result.is_a?(Enumerable)
    result.each { |value| tracker.on_value(value) }
  else
    tracker.on_value(result)
  end
end