class Akasha::Storage::MemoryEventStore::Stream
Memory-based event stream.
Public Class Methods
new(&before_write)
click to toggle source
Creates a new event stream. Accepts an optional block, allowing for filtering new events and triggering side-effects, before new events are appended to the stream,
# File lib/akasha/storage/memory_event_store/stream.rb, line 9 def initialize(&before_write) @before_write = before_write || identity @events = [] @metadata = {} @monitor = Monitor.new end
Public Instance Methods
metadata()
click to toggle source
# File lib/akasha/storage/memory_event_store/stream.rb, line 37 def metadata @monitor.synchronize do @metadata end end
metadata=(metadata)
click to toggle source
# File lib/akasha/storage/memory_event_store/stream.rb, line 43 def metadata=(metadata) @monitor.synchronize do @metadata = metadata end end
read_events(start, page_size, **_options, &block)
click to toggle source
Reads events from the stream starting from `start` inclusive. If block given, reads all events from the start in pages of `page_size`. If block not given, reads `page_size` events from the start.
# File lib/akasha/storage/memory_event_store/stream.rb, line 27 def read_events(start, page_size, **_options, &block) @monitor.synchronize do if block_given? @events.drop(start).each_slice(page_size, &block) else @events[start..start + page_size] end end end
write_events(events, revision: nil)
click to toggle source
Appends events to the stream.
# File lib/akasha/storage/memory_event_store/stream.rb, line 17 def write_events(events, revision: nil) @monitor.synchronize do check_revision!(revision) @events += to_recorded_events(@events.size, @before_write.call(events)) end end
Private Instance Methods
check_revision!(expected_revision)
click to toggle source
# File lib/akasha/storage/memory_event_store/stream.rb, line 55 def check_revision!(expected_revision) return if expected_revision.nil? actual_revision = @events.size - 1 return if expected_revision == actual_revision raise ConflictError, "Race condition; expected last event version: #{expected_revision} actual: #{actual_revision}" end
identity()
click to toggle source
# File lib/akasha/storage/memory_event_store/stream.rb, line 51 def identity ->(x) { x } end
to_recorded_events(current_revision, events)
click to toggle source
# File lib/akasha/storage/memory_event_store/stream.rb, line 63 def to_recorded_events(current_revision, events) events.each_with_index.map do |event, i| updated_at = Time.now.utc # Cheating. RecordedEvent.new(event.name, event.id, current_revision + i, updated_at, event.metadata, **event.data) end end