class Reacto::Operations::Throttle
Public Class Methods
new(delay)
click to toggle source
# File lib/reacto/operations/throttle.rb, line 8 def initialize(delay) @delay = delay @last = NO_VALUE @ready = false @error = NO_VALUE @close = false end
Public Instance Methods
call(tracker)
click to toggle source
# File lib/reacto/operations/throttle.rb, line 16 def call(tracker) close = -> () { @close = true } error = -> (e) do delay_task(tracker) unless @ready @error = e end value = -> (v) do delay_task(tracker) unless @ready @last = v end Subscriptions::OperationSubscription.new( tracker, value: value, close: close, error: error ) end
Private Instance Methods
delay_task(tracker)
click to toggle source
# File lib/reacto/operations/throttle.rb, line 41 def delay_task(tracker) @task = Concurrent::TimerTask.new(execution_interval: @delay) do if @error != NO_VALUE tracker.on_error(@error) finish elsif @close tracker.on_close finish elsif @last != NO_VALUE tracker.on_value(@last) @last = NO_VALUE end end @task.execute @ready = true end
finish()
click to toggle source
# File lib/reacto/operations/throttle.rb, line 37 def finish @task.shutdown if @task end