class Akasha::Storage::MemoryEventStore::Stream

Memory-based event stream.

Public Class Methods

new(&before_write) click to toggle source

Creates a new event stream. Accepts an optional block, allowing for filtering new events and triggering side-effects, before new events are appended to the stream,

# File lib/akasha/storage/memory_event_store/stream.rb, line 9
def initialize(&before_write)
  @before_write = before_write || identity
  @events = []
  @metadata = {}
  @monitor = Monitor.new
end

Public Instance Methods

metadata() click to toggle source
# File lib/akasha/storage/memory_event_store/stream.rb, line 37
def metadata
  @monitor.synchronize do
    @metadata
  end
end
metadata=(metadata) click to toggle source
# File lib/akasha/storage/memory_event_store/stream.rb, line 43
def metadata=(metadata)
  @monitor.synchronize do
    @metadata = metadata
  end
end
read_events(start, page_size, **_options, &block) click to toggle source

Reads events from the stream starting from `start` inclusive. If block given, reads all events from the start in pages of `page_size`. If block not given, reads `page_size` events from the start.

# File lib/akasha/storage/memory_event_store/stream.rb, line 27
def read_events(start, page_size, **_options, &block)
  @monitor.synchronize do
    if block_given?
      @events.drop(start).each_slice(page_size, &block)
    else
      @events[start..start + page_size]
    end
  end
end
write_events(events, revision: nil) click to toggle source

Appends events to the stream.

# File lib/akasha/storage/memory_event_store/stream.rb, line 17
def write_events(events, revision: nil)
  @monitor.synchronize do
    check_revision!(revision)
    @events += to_recorded_events(@events.size, @before_write.call(events))
  end
end

Private Instance Methods

check_revision!(expected_revision) click to toggle source
# File lib/akasha/storage/memory_event_store/stream.rb, line 55
def check_revision!(expected_revision)
  return if expected_revision.nil?
  actual_revision = @events.size - 1
  return if expected_revision == actual_revision
  raise ConflictError,
        "Race condition; expected last event version: #{expected_revision} actual: #{actual_revision}"
end
identity() click to toggle source
# File lib/akasha/storage/memory_event_store/stream.rb, line 51
def identity
  ->(x) { x }
end
to_recorded_events(current_revision, events) click to toggle source
# File lib/akasha/storage/memory_event_store/stream.rb, line 63
def to_recorded_events(current_revision, events)
  events.each_with_index.map do |event, i|
    updated_at = Time.now.utc # Cheating.
    RecordedEvent.new(event.name, event.id, current_revision + i,
                      updated_at, event.metadata, **event.data)
  end
end