class Reacto::Operations::Throttle

Public Class Methods

new(delay) click to toggle source
# File lib/reacto/operations/throttle.rb, line 8
def initialize(delay)
  @delay = delay
  @last = NO_VALUE
  @ready = false
  @error = NO_VALUE
  @close = false
end

Public Instance Methods

call(tracker) click to toggle source
# File lib/reacto/operations/throttle.rb, line 16
def call(tracker)
  close = -> () { @close = true }
  error = -> (e) do
    delay_task(tracker) unless @ready
    @error = e
  end
  value = -> (v) do
    delay_task(tracker) unless @ready
    @last = v
  end

  Subscriptions::OperationSubscription.new(
    tracker,
    value: value,
    close: close,
    error: error
  )
end

Private Instance Methods

delay_task(tracker) click to toggle source
# File lib/reacto/operations/throttle.rb, line 41
def delay_task(tracker)
  @task = Concurrent::TimerTask.new(execution_interval: @delay) do
    if @error != NO_VALUE
      tracker.on_error(@error)
      finish
    elsif @close
      tracker.on_close
      finish
    elsif @last != NO_VALUE
      tracker.on_value(@last)
      @last = NO_VALUE
    end
  end
  @task.execute
  @ready = true
end
finish() click to toggle source
# File lib/reacto/operations/throttle.rb, line 37
def finish
  @task.shutdown if @task
end