class Synapse::EventStore::Mongo::MongoEventStore

Implementation of an event store backed by a Mongo database

Public Class Methods

new(template, storage_strategy) click to toggle source

@param [MongoTemplate] template @param [StorageStrategy] storage_strategy @return [undefined]

# File lib/synapse/event_store/mongo/event_store.rb, line 9
def initialize(template, storage_strategy)
  @storage_strategy = storage_strategy
  @template = template
end

Public Instance Methods

append_events(type_identifier, stream) click to toggle source

@raise [EventStoreError] If an error occurs while appending the stream to the store @param [String] type_identifier Type descriptor of the aggregate to append to @param [DomainEventStream] stream @return [undefined]

# File lib/synapse/event_store/mongo/event_store.rb, line 46
def append_events(type_identifier, stream)
  events = stream.to_a
  documents = @storage_strategy.create_documents type_identifier, events

  begin
    @template.event_collection.insert documents
  rescue ::Mongo::OperationFailure => exception
    if exception.error_code == 11000
      raise Repository::ConcurrencyError,
        'Event for this aggregate and sequence number already present'
    end

    raise ex
  end
end
append_snapshot_event(type_identifier, snapshot_event) click to toggle source

@raise [EventStoreError] If an error occurs while appending the event to the store @param [String] type_identifier Type descriptor of the aggregate to append to @param [DomainEventMessage] snapshot_event @return [undefined]

# File lib/synapse/event_store/mongo/event_store.rb, line 66
def append_snapshot_event(type_identifier, snapshot_event)
  documents = @storage_strategy.create_documents type_identifier, [snapshot_event]
  @template.snapshot_collection.insert documents
end
ensure_indexes() click to toggle source

@return [undefined]

# File lib/synapse/event_store/mongo/event_store.rb, line 15
def ensure_indexes
  @storage_strategy.ensure_indexes
end
read_events(type_identifier, aggregate_id) click to toggle source

@raise [EventStoreError] If an error occurs while reading the stream from the store @param [String] type_identifier Type descriptor of the aggregate to retrieve @param [Object] aggregate_id @return [DomainEventStream]

# File lib/synapse/event_store/mongo/event_store.rb, line 23
def read_events(type_identifier, aggregate_id)
  first_sequence_number = -1

  last_snapshot_commit = load_last_snapshot type_identifier, aggregate_id
  if last_snapshot_commit and last_snapshot_commit.size > 0
    first_sequence_number = last_snapshot_commit[0].sequence_number
  end

  first_sequence_number = first_sequence_number.next

  cursor = @storage_strategy.fetch_events type_identifier, aggregate_id, first_sequence_number

  unless last_snapshot_commit or cursor.has_next?
    raise StreamNotFoundError.new type_identifier, aggregate_id
  end

  CursorDomainEventStream.new @storage_strategy, cursor, last_snapshot_commit, aggregate_id
end

Private Instance Methods

load_last_snapshot(type_identifier, aggregate_id) click to toggle source

@param [String] type_identifier Type descriptor of the aggregate to retrieve @param [Object] aggregate_id

# File lib/synapse/event_store/mongo/event_store.rb, line 75
def load_last_snapshot(type_identifier, aggregate_id)
  cursor = @storage_strategy.fetch_last_snapshot type_identifier, aggregate_id

  return unless cursor.has_next?

  first = cursor.next_document
  @storage_strategy.extract_events first, aggregate_id
end