class Reacto::Operations::EachCollect

Public Class Methods

new( n, reset_action: -> (_) { [] } click to toggle source
# File lib/reacto/operations/each_collect.rb, line 7
def initialize(
  n,
  reset_action: -> (_) { [] }, collect_action: nil,
  init_action: NO_ACTION, on_error: nil, on_close: nil
)
  @n = n
  @reset_action = reset_action
  @collect_action = collect_action
  @init_action = init_action

  @error = on_error
  @close = on_close
end

Public Instance Methods

call(tracker) click to toggle source
# File lib/reacto/operations/each_collect.rb, line 21
def call(tracker)
  current = []
  @init_action.call

  unless @collect_action
    @collect_action = -> (value, collection) { collection << value }
  end

  error = @error ? @error : ->(e) do
    tracker.on_value(current) unless current.empty?
    tracker.on_error(e)
  end
  close = @close ? @close : ->() do
    tracker.on_value(current) unless current.empty?
    tracker.on_close
  end

  error = error == NO_ACTION ? tracker.method(:on_error) : error
  close = close == NO_ACTION ? tracker.method(:on_close) : close

  behaviour = -> (value) do
    @collect_action.call(value, current)

    if current.size == @n
      tracker.on_value(current)

      current = @reset_action.call(current)
    end
  end

  Subscriptions::OperationSubscription.new(
    tracker, value: behaviour, error: error, close: close
  )
end