class Reacto::Operations::Buffer
Public Class Methods
new(count: nil, delay: nil)
click to toggle source
# File lib/reacto/operations/buffer.rb, line 7 def initialize(count: nil, delay: nil) @count = count @delay = delay @buffer = [] end
Public Instance Methods
call(tracker)
click to toggle source
# File lib/reacto/operations/buffer.rb, line 13 def call(tracker) close = lambda do finish(tracker) tracker.on_close end error = lambda do |e| finish(tracker) tracker.on_error(e) end value = if !@count.nil? && @delay.nil? count_buffer_behaviour(tracker) elsif @count.nil? && !@delay.nil? delay_buffer_behaviour(tracker) elsif @count && @delay count_and_delay_buffer_behaviour(tracker) else tracker.method(:on_value) end Subscriptions::OperationSubscription.new( tracker, value: value, close: close, error: error ) end
Private Instance Methods
count_and_delay_buffer_behaviour(tracker)
click to toggle source
# File lib/reacto/operations/buffer.rb, line 71 def count_and_delay_buffer_behaviour(tracker) delay_task(tracker) count_buffer_behaviour(tracker) end
count_buffer_behaviour(tracker)
click to toggle source
# File lib/reacto/operations/buffer.rb, line 45 def count_buffer_behaviour(tracker) lambda do |value| @buffer << value if @buffer.size >= @count tracker.on_value(@buffer) @buffer = [] end end end
delay_buffer_behaviour(tracker)
click to toggle source
# File lib/reacto/operations/buffer.rb, line 66 def delay_buffer_behaviour(tracker) delay_task(tracker) -> (value) { @buffer << value } end
delay_task(tracker)
click to toggle source
# File lib/reacto/operations/buffer.rb, line 56 def delay_task(tracker) @task = Concurrent::TimerTask.new(execution_interval: @delay) do unless @buffer.empty? tracker.on_value(@buffer) @buffer = [] end end @task.execute end
finish(tracker)
click to toggle source
# File lib/reacto/operations/buffer.rb, line 39 def finish(tracker) @task.shutdown if @task tracker.on_value(@buffer) unless @buffer.empty? end