class Reacto::Operations::ChunkWhile
Public Class Methods
new(func, executor: nil)
click to toggle source
# File lib/reacto/operations/chunk_while.rb, line 7 def initialize(func, executor: nil) @func = func @executor = executor end
Public Instance Methods
call(tracker)
click to toggle source
# File lib/reacto/operations/chunk_while.rb, line 12 def call(tracker) current_data = [] prev_value = NO_VALUE value = ->(v) do if prev_value == NO_VALUE prev_value = v current_data << v return end should_continue = @func.call(prev_value, v) prev_value = v unless should_continue flush_current(tracker, current_data) current_data = [] end current_data << v end error = ->(e) do flush_current(tracker, current_data) tracker.on_error(e) end close = ->() do flush_current(tracker, current_data) tracker.on_close end Subscriptions::OperationSubscription.new( tracker, value: value, close: close, error: error ) end
flush_current(tracker, current_data)
click to toggle source
# File lib/reacto/operations/chunk_while.rb, line 49 def flush_current(tracker, current_data) tracker.on_value( Trackable.enumerable(current_data, executor: @executor) ) end