class EventSource::Iterator

Attributes

batch[RW]
batch_index[W]
starting_position[RW]

Public Class Methods

build(get, stream_name, position: nil) click to toggle source
# File lib/event_source/iterator.rb, line 15
def self.build(get, stream_name, position: nil)
  new(get, stream_name).tap do |instance|
    instance.starting_position = position
    Log.get(self).debug { "Built Iterator (Stream Name: #{stream_name}, Starting Position: #{position.inspect})" }
  end
end
configure(receiver, get, stream_name, attr_name: nil, position: nil) click to toggle source
# File lib/event_source/iterator.rb, line 22
def self.configure(receiver, get, stream_name, attr_name: nil, position: nil)
  attr_name ||= :iterator
  instance = build(get, stream_name, position: position)
  receiver.public_send "#{attr_name}=", instance
end

Public Instance Methods

advance_batch_index() click to toggle source
# File lib/event_source/iterator.rb, line 43
def advance_batch_index
  logger.trace { "Advancing batch index (Batch Index: #{batch_index})" }
  self.batch_index += 1
  logger.debug { "Advanced batch index (Batch Index: #{batch_index})" }
end
batch_depleted?() click to toggle source
# File lib/event_source/iterator.rb, line 49
def batch_depleted?
  if batch.nil?
    logger.debug { "Batch is depleted (Batch is nil)" }
    return true
  end

  if batch.empty?
    logger.debug { "Batch is depleted (Batch is empty)" }
    return true
  end

  if batch_index == batch.length
    logger.debug { "Batch is depleted (Batch Index: #{batch_index}, Batch Length: #{batch.length})" }
    return true
  end

  false
end
batch_index() click to toggle source
# File lib/event_source/iterator.rb, line 8
def batch_index
  @batch_index ||= 0
end
get_batch() click to toggle source
# File lib/event_source/iterator.rb, line 77
def get_batch
  position = next_batch_starting_position

  logger.trace "Getting batch (Position: #{position.inspect})"

  batch = []
  if position.nil? || position >= 0
    batch = get.(stream_name, position: position)
  end

  logger.debug { "Finished getting batch (Count: #{batch.length}, Position: #{position.inspect})" }

  batch
end
last_position() click to toggle source
# File lib/event_source/iterator.rb, line 105
def last_position
  unless EventSource::StreamName.category?(stream_name)
    batch.last.position
  else
    batch.last.global_position
  end
end
next() click to toggle source
# File lib/event_source/iterator.rb, line 28
def next
  logger.trace { "Getting next event data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" }

  resupply if batch_depleted?

  event_data = batch[batch_index]

  logger.debug(tags: [:data, :event_data]) { "Next event data: #{event_data.pretty_inspect}" }
  logger.debug { "Done getting next event data (Batch Length: #{(batch &.length).inspect}, Batch Index: #{batch_index})" }

  advance_batch_index

  event_data
end
next_batch_starting_position() click to toggle source
# File lib/event_source/iterator.rb, line 92
def next_batch_starting_position
  if batch.nil?
    logger.debug { "Batch is nil (Next batch starting position: #{starting_position.inspect})" }
    return starting_position
  end

  previous_position = last_position
  next_position = previous_position + 1
  logger.debug { "End of batch (Next starting position: #{next_position}, Previous Position: #{previous_position})" }

  next_position
end
reset(batch) click to toggle source
# File lib/event_source/iterator.rb, line 113
def reset(batch)
  logger.trace { "Resetting batch" }

  self.batch = batch
  self.batch_index = 0

  logger.debug(tags: [:data, :batch]) { "Batch set to: \n#{batch.pretty_inspect}" }
  logger.debug(tags: [:data, :batch]) { "Batch position set to: #{batch_index.inspect}" }
  logger.debug { "Done resetting batch" }
end
resupply() click to toggle source
# File lib/event_source/iterator.rb, line 68
def resupply
  logger.trace { "Resupplying batch (Current Batch Length: #{(batch &.length).inspect})" }

  batch = get_batch
  reset(batch)

  logger.debug { "Batch resupplied (Next Batch Length: #{(batch &.length).inspect})" }
end