class Akasha::Checkpoint::MetadataCheckpoint
Stores stream position in stream metadata.
Public Class Methods
new(stream, interval: 1)
click to toggle source
Creates a new checkpoint, storing position in `stream` every `interval` events. Use `interval` greater than zero for idempotent event listeners.
# File lib/akasha/checkpoint/metadata_checkpoint.rb, line 7 def initialize(stream, interval: 1) @stream = stream @interval = interval return if @stream.respond_to?(:metadata) && @stream.respond_to?(:metadata=) raise UnsupportedStorageError, "Storage does not support checkpoints: #{stream.class}" end
Public Instance Methods
ack(position)
click to toggle source
Returns the next position, conditionally storing it (based on the configurable interval).
# File lib/akasha/checkpoint/metadata_checkpoint.rb, line 22 def ack(position) @next_position = position + 1 if (@next_position % @interval).zero? # TODO: Race condition; use optimistic cocurrency. @stream.metadata = @stream.metadata.merge(next_position: @next_position) end @next_position rescue Akasha::HttpClientError => e raise if e.status_code != 404 raise CheckpointStreamNotFoundError, "Stream cannot be checkpointed; it does not exist: #{@stream.name}" end
latest()
click to toggle source
rubocop:disable Naming/MemoizedInstanceVariableName Returns the most recently stored next position.
# File lib/akasha/checkpoint/metadata_checkpoint.rb, line 16 def latest @next_position ||= (read_position || 0) end
Protected Instance Methods
read_position()
click to toggle source
# File lib/akasha/checkpoint/metadata_checkpoint.rb, line 36 def read_position @stream.metadata[:next_position] rescue Akasha::HttpClientError => e return 0 if e.status_code == 404 raise end