class Reacto::Operations::Drop

Public Class Methods

new(how_many_to_drop, offset = NO_VALUE) click to toggle source
# File lib/reacto/operations/drop.rb, line 7
def initialize(how_many_to_drop, offset = NO_VALUE)
  if how_many_to_drop < 0
    raise ArgumentError.new('Attempt to drop negative size!')
  end

  @how_many_to_drop = how_many_to_drop
  @offset = offset
end

Public Instance Methods

call(tracker) click to toggle source
# File lib/reacto/operations/drop.rb, line 16
def call(tracker)
  dropped = 0

  behaviour = -> (value) do
    dropped += 1

    if dropped > @how_many_to_drop
      if @offset != NO_VALUE
        if @offset <= 0
          tracker.on_close
          return
        else
          @offset -= 1
        end
      end
      tracker.on_value(value)
    end
  end

  Subscriptions::OperationSubscription.new(tracker, value: behaviour)
end