class RubyEventStore::ROM::EventRepository

Public Class Methods

new(rom: ROM.env) click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 14
def initialize(rom: ROM.env)
  raise ArgumentError, 'Must specify rom' unless rom && rom.instance_of?(Env)

  @rom = rom
  @events = Repositories::Events.new(rom.rom_container)
  @stream_entries = Repositories::StreamEntries.new(rom.rom_container)
end

Public Instance Methods

append_to_stream(events, stream, expected_version) click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 22
def append_to_stream(events, stream, expected_version)
  events = Array(events)
  event_ids = events.map(&:event_id)

  guard_for(:unique_violation) do
    unit_of_work do |changesets|
      # Create changesets inside transaction because
      # we want to find the last position (a.k.a. version)
      # again if the transaction is retried due to a
      # deadlock in MySQL
      changesets << @events.create_changeset(events)
      changesets << @stream_entries.create_changeset(
        event_ids,
        stream,
        @stream_entries.resolve_version(stream, expected_version),
        global_stream: true
      )
    end
  end

  self
end
count(specification) click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 84
def count(specification)
  raise ReservedInternalName if specification.stream.name.eql?(@stream_entries.stream_entries.class::SERIALIZED_GLOBAL_STREAM_NAME)

  @events.count(specification)
end
delete_stream(stream) click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 66
def delete_stream(stream)
  @stream_entries.delete(stream)
end
has_event?(event_id) click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 70
def has_event?(event_id)
  guard_for(:not_found, event_id, swallow: EventNotFound) { @events.exist?(event_id) } || false
end
last_stream_event(stream) click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 74
def last_stream_event(stream)
  @events.last_stream_event(stream)
end
read(specification) click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 78
def read(specification)
  raise ReservedInternalName if specification.stream.name.eql?(@stream_entries.stream_entries.class::SERIALIZED_GLOBAL_STREAM_NAME)

  @events.read(specification)
end
streams_of(event_id) click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 101
def streams_of(event_id)
  @stream_entries.streams_of(event_id)
                 .map { |name| Stream.new(name) }
end
update_messages(messages) click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 90
def update_messages(messages)
  # Validate event IDs
  @events
    .find_nonexistent_pks(messages.map(&:event_id))
    .each { |id| raise EventNotFound, id }

  unit_of_work do |changesets|
    changesets << @events.update_changeset(messages)
  end
end