class Reacto::Operations::Buffer

Public Class Methods

new(count: nil, delay: nil) click to toggle source
# File lib/reacto/operations/buffer.rb, line 7
def initialize(count: nil, delay: nil)
  @count = count
  @delay = delay
  @buffer = []
end

Public Instance Methods

call(tracker) click to toggle source
# File lib/reacto/operations/buffer.rb, line 13
def call(tracker)
  close = lambda do
    finish(tracker)
    tracker.on_close
  end
  error = lambda do |e|
    finish(tracker)
    tracker.on_error(e)
  end
  value = if !@count.nil? && @delay.nil?
            count_buffer_behaviour(tracker)
          elsif @count.nil? && !@delay.nil?
            delay_buffer_behaviour(tracker)
          elsif @count && @delay
            count_and_delay_buffer_behaviour(tracker)
          else
            tracker.method(:on_value)
          end

  Subscriptions::OperationSubscription.new(
    tracker, value: value, close: close, error: error
  )
end

Private Instance Methods

count_and_delay_buffer_behaviour(tracker) click to toggle source
# File lib/reacto/operations/buffer.rb, line 71
def count_and_delay_buffer_behaviour(tracker)
  delay_task(tracker)
  count_buffer_behaviour(tracker)
end
count_buffer_behaviour(tracker) click to toggle source
# File lib/reacto/operations/buffer.rb, line 45
def count_buffer_behaviour(tracker)
  lambda do |value|
    @buffer << value

    if @buffer.size >= @count
      tracker.on_value(@buffer)
      @buffer = []
    end
  end
end
delay_buffer_behaviour(tracker) click to toggle source
# File lib/reacto/operations/buffer.rb, line 66
def delay_buffer_behaviour(tracker)
  delay_task(tracker)
  -> (value) { @buffer << value }
end
delay_task(tracker) click to toggle source
# File lib/reacto/operations/buffer.rb, line 56
def delay_task(tracker)
  @task = Concurrent::TimerTask.new(execution_interval: @delay) do
    unless @buffer.empty?
      tracker.on_value(@buffer)
      @buffer = []
    end
  end
  @task.execute
end
finish(tracker) click to toggle source
# File lib/reacto/operations/buffer.rb, line 39
def finish(tracker)
  @task.shutdown if @task

  tracker.on_value(@buffer) unless @buffer.empty?
end