class Akasha::Checkpoint::MetadataCheckpoint

Stores stream position in stream metadata.

Public Class Methods

new(stream, interval: 1) click to toggle source

Creates a new checkpoint, storing position in `stream` every `interval` events. Use `interval` greater than zero for idempotent event listeners.

# File lib/akasha/checkpoint/metadata_checkpoint.rb, line 7
def initialize(stream, interval: 1)
  @stream = stream
  @interval = interval
  return if @stream.respond_to?(:metadata) && @stream.respond_to?(:metadata=)
  raise UnsupportedStorageError, "Storage does not support checkpoints: #{stream.class}"
end

Public Instance Methods

ack(position) click to toggle source

Returns the next position, conditionally storing it (based on the configurable interval).

# File lib/akasha/checkpoint/metadata_checkpoint.rb, line 22
def ack(position)
  @next_position = position + 1
  if (@next_position % @interval).zero?
    # TODO: Race condition; use optimistic cocurrency.
    @stream.metadata = @stream.metadata.merge(next_position: @next_position)
  end
  @next_position
rescue Akasha::HttpClientError => e
  raise if e.status_code != 404
  raise CheckpointStreamNotFoundError, "Stream cannot be checkpointed; it does not exist: #{@stream.name}"
end
latest() click to toggle source

rubocop:disable Naming/MemoizedInstanceVariableName Returns the most recently stored next position.

# File lib/akasha/checkpoint/metadata_checkpoint.rb, line 16
def latest
  @next_position ||= (read_position || 0)
end

Protected Instance Methods

read_position() click to toggle source
# File lib/akasha/checkpoint/metadata_checkpoint.rb, line 36
def read_position
  @stream.metadata[:next_position]
rescue Akasha::HttpClientError => e
  return 0 if e.status_code == 404
  raise
end