class Synapse::EventStore::Mongo::MongoEventStore
Implementation of an event store backed by a Mongo
database
Public Class Methods
@param [MongoTemplate] template @param [StorageStrategy] storage_strategy @return [undefined]
# File lib/synapse/event_store/mongo/event_store.rb, line 9 def initialize(template, storage_strategy) @storage_strategy = storage_strategy @template = template end
Public Instance Methods
@raise [EventStoreError] If an error occurs while appending the stream to the store @param [String] type_identifier Type descriptor of the aggregate to append to @param [DomainEventStream] stream @return [undefined]
# File lib/synapse/event_store/mongo/event_store.rb, line 46 def append_events(type_identifier, stream) events = stream.to_a documents = @storage_strategy.create_documents type_identifier, events begin @template.event_collection.insert documents rescue ::Mongo::OperationFailure => exception if exception.error_code == 11000 raise Repository::ConcurrencyError, 'Event for this aggregate and sequence number already present' end raise ex end end
@raise [EventStoreError] If an error occurs while appending the event to the store @param [String] type_identifier Type descriptor of the aggregate to append to @param [DomainEventMessage] snapshot_event @return [undefined]
# File lib/synapse/event_store/mongo/event_store.rb, line 66 def append_snapshot_event(type_identifier, snapshot_event) documents = @storage_strategy.create_documents type_identifier, [snapshot_event] @template.snapshot_collection.insert documents end
@return [undefined]
# File lib/synapse/event_store/mongo/event_store.rb, line 15 def ensure_indexes @storage_strategy.ensure_indexes end
@raise [EventStoreError] If an error occurs while reading the stream from the store @param [String] type_identifier Type descriptor of the aggregate to retrieve @param [Object] aggregate_id @return [DomainEventStream]
# File lib/synapse/event_store/mongo/event_store.rb, line 23 def read_events(type_identifier, aggregate_id) first_sequence_number = -1 last_snapshot_commit = load_last_snapshot type_identifier, aggregate_id if last_snapshot_commit and last_snapshot_commit.size > 0 first_sequence_number = last_snapshot_commit[0].sequence_number end first_sequence_number = first_sequence_number.next cursor = @storage_strategy.fetch_events type_identifier, aggregate_id, first_sequence_number unless last_snapshot_commit or cursor.has_next? raise StreamNotFoundError.new type_identifier, aggregate_id end CursorDomainEventStream.new @storage_strategy, cursor, last_snapshot_commit, aggregate_id end
Private Instance Methods
@param [String] type_identifier Type descriptor of the aggregate to retrieve @param [Object] aggregate_id
# File lib/synapse/event_store/mongo/event_store.rb, line 75 def load_last_snapshot(type_identifier, aggregate_id) cursor = @storage_strategy.fetch_last_snapshot type_identifier, aggregate_id return unless cursor.has_next? first = cursor.next_document @storage_strategy.extract_events first, aggregate_id end