class Reacto::Operations::ChunkWhile

Public Class Methods

new(func, executor: nil) click to toggle source
# File lib/reacto/operations/chunk_while.rb, line 7
def initialize(func, executor: nil)
  @func = func
  @executor = executor
end

Public Instance Methods

call(tracker) click to toggle source
# File lib/reacto/operations/chunk_while.rb, line 12
def call(tracker)
  current_data = []
  prev_value = NO_VALUE

  value = ->(v) do
    if prev_value == NO_VALUE
      prev_value = v
      current_data << v

      return
    end

    should_continue = @func.call(prev_value, v)
    prev_value = v

    unless should_continue
      flush_current(tracker, current_data)
      current_data = []
    end
    current_data << v
  end

  error = ->(e) do
    flush_current(tracker, current_data)
    tracker.on_error(e)
  end

  close = ->() do
    flush_current(tracker, current_data)
    tracker.on_close
  end

  Subscriptions::OperationSubscription.new(
    tracker, value: value, close: close, error: error
  )
end
flush_current(tracker, current_data) click to toggle source
# File lib/reacto/operations/chunk_while.rb, line 49
def flush_current(tracker, current_data)
  tracker.on_value(
    Trackable.enumerable(current_data, executor: @executor)
  )
end