class Reacto::Operations::DelayEach

Public Class Methods

new(delay) click to toggle source
# File lib/reacto/operations/delay_each.rb, line 20
def initialize(delay)
  @delay = delay
  @queue = []
end

Public Instance Methods

call(tracker) click to toggle source
# File lib/reacto/operations/delay_each.rb, line 25
def call(tracker)
  close = lambda do
    @queue << OpenStruct.new(type: :close)
    delay_task(tracker)
  end

  error = lambda do |e|
    @queue << OpenStruct.new(error: e, type: :error)
    delay_task(tracker)
  end

  value = lambda do |v|
    @queue << OpenStruct.new(value: v, type: :value)
    delay_task(tracker)
  end

  Subscriptions::OperationSubscription.new(
    tracker,
    value: value,
    close: close,
    error: error
  )
end

Private Instance Methods

delay_task(tracker) click to toggle source
# File lib/reacto/operations/delay_each.rb, line 51
def delay_task(tracker)
  return if @task

  @task = Concurrent::TimerTask.new(execution_interval: @delay) do
    notification = @queue.shift

    return unless notification

    if notification.type == :value
      tracker.on_value(notification.value)
    elsif notification.type == :error
      tracker.on_error(notification.error)
      @task.shutdown
    else
      tracker.on_close
      @task.shutdown
    end
  end
  @task.add_observer(TaskObserver.new(tracker))

  @task.execute
end