class RubyEventStore::ROM::EventRepository
Public Class Methods
new(rom: ROM.env)
click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 14 def initialize(rom: ROM.env) raise ArgumentError, 'Must specify rom' unless rom && rom.instance_of?(Env) @rom = rom @events = Repositories::Events.new(rom.rom_container) @stream_entries = Repositories::StreamEntries.new(rom.rom_container) end
Public Instance Methods
append_to_stream(events, stream, expected_version)
click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 22 def append_to_stream(events, stream, expected_version) events = Array(events) event_ids = events.map(&:event_id) guard_for(:unique_violation) do unit_of_work do |changesets| # Create changesets inside transaction because # we want to find the last position (a.k.a. version) # again if the transaction is retried due to a # deadlock in MySQL changesets << @events.create_changeset(events) changesets << @stream_entries.create_changeset( event_ids, stream, @stream_entries.resolve_version(stream, expected_version), global_stream: true ) end end self end
count(specification)
click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 84 def count(specification) raise ReservedInternalName if specification.stream.name.eql?(@stream_entries.stream_entries.class::SERIALIZED_GLOBAL_STREAM_NAME) @events.count(specification) end
delete_stream(stream)
click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 66 def delete_stream(stream) @stream_entries.delete(stream) end
has_event?(event_id)
click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 70 def has_event?(event_id) guard_for(:not_found, event_id, swallow: EventNotFound) { @events.exist?(event_id) } || false end
last_stream_event(stream)
click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 74 def last_stream_event(stream) @events.last_stream_event(stream) end
link_to_stream(event_ids, stream, expected_version)
click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 45 def link_to_stream(event_ids, stream, expected_version) event_ids = Array(event_ids) # Validate event IDs @events .find_nonexistent_pks(event_ids) .each { |id| raise EventNotFound, id } guard_for(:unique_violation) do unit_of_work do |changesets| changesets << @stream_entries.create_changeset( event_ids, stream, @stream_entries.resolve_version(stream, expected_version) ) end end self end
read(specification)
click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 78 def read(specification) raise ReservedInternalName if specification.stream.name.eql?(@stream_entries.stream_entries.class::SERIALIZED_GLOBAL_STREAM_NAME) @events.read(specification) end
streams_of(event_id)
click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 101 def streams_of(event_id) @stream_entries.streams_of(event_id) .map { |name| Stream.new(name) } end
update_messages(messages)
click to toggle source
# File lib/ruby_event_store/rom/event_repository.rb, line 90 def update_messages(messages) # Validate event IDs @events .find_nonexistent_pks(messages.map(&:event_id)) .each { |id| raise EventNotFound, id } unit_of_work do |changesets| changesets << @events.update_changeset(messages) end end