class Reacto::Operations::Chunk
Public Class Methods
new(func, executor: nil)
click to toggle source
# File lib/reacto/operations/chunk.rb, line 8 def initialize(func, executor: nil) @func = func @executor = executor end
Public Instance Methods
call(tracker)
click to toggle source
# File lib/reacto/operations/chunk.rb, line 13 def call(tracker) @current_key = NO_VALUE @current_data = [] value = ->(v) do key = @func.call(v) if key == nil || key == :_separator flush_current!(tracker) return end if key == :_alone flush_current!(tracker) tracker.on_value(LabeledTrackable.new( key, @executor, &Behaviours.single_value(v) )) return end if key.to_s.start_with?('_') flush_current!(tracker) tracker.on_error(RuntimeError.new( 'symbols beginning with an underscore are reserved' )) return end if @current_key == NO_VALUE || @current_key == key @current_key = key @current_data << v return end flush_current!(tracker) @current_key = key @current_data = [v] end error = ->(e) do flush_current!(tracker) tracker.on_error(e) end close = ->() do flush_current!(tracker) tracker.on_close end Subscriptions::OperationSubscription.new( tracker, value: value, close: close, error: error ) end
flush_current!(tracker)
click to toggle source
# File lib/reacto/operations/chunk.rb, line 69 def flush_current!(tracker) if @current_key != NO_VALUE tracker.on_value(LabeledTrackable.new( @current_key, @executor, &Behaviours.enumerable(@current_data) )) end @current_key = NO_VALUE @current_data = [] end