class Reacto::Operations::DelayEach
Public Class Methods
new(delay)
click to toggle source
# File lib/reacto/operations/delay_each.rb, line 20 def initialize(delay) @delay = delay @queue = [] end
Public Instance Methods
call(tracker)
click to toggle source
# File lib/reacto/operations/delay_each.rb, line 25 def call(tracker) close = lambda do @queue << OpenStruct.new(type: :close) delay_task(tracker) end error = lambda do |e| @queue << OpenStruct.new(error: e, type: :error) delay_task(tracker) end value = lambda do |v| @queue << OpenStruct.new(value: v, type: :value) delay_task(tracker) end Subscriptions::OperationSubscription.new( tracker, value: value, close: close, error: error ) end
Private Instance Methods
delay_task(tracker)
click to toggle source
# File lib/reacto/operations/delay_each.rb, line 51 def delay_task(tracker) return if @task @task = Concurrent::TimerTask.new(execution_interval: @delay) do notification = @queue.shift return unless notification if notification.type == :value tracker.on_value(notification.value) elsif notification.type == :error tracker.on_error(notification.error) @task.shutdown else tracker.on_close @task.shutdown end end @task.add_observer(TaskObserver.new(tracker)) @task.execute end