class Reacto::Operations::BlockingEnumerable
Public Class Methods
new(method_name, block)
click to toggle source
# File lib/reacto/operations/blocking_enumerable.rb, line 6 def initialize(method_name, block) @method_name = method_name @block = block end
Public Instance Methods
call(tracker)
click to toggle source
# File lib/reacto/operations/blocking_enumerable.rb, line 11 def call(tracker) data = [] value = -> (val) { data << val } close = -> do emit(tracker, data) tracker.on_close end error = ->(e) do emit(tracker, data) tracker.on_error(e) end Subscriptions::OperationSubscription.new( tracker, value: value, error: error, close: close ) end
emit(tracker, data)
click to toggle source
# File lib/reacto/operations/blocking_enumerable.rb, line 29 def emit(tracker, data) result = data.send(@method_name, &@block) if result.is_a?(Enumerable) result.each { |value| tracker.on_value(value) } else tracker.on_value(result) end end