class RubyEventStore::ROM::Repositories::Events
Public Instance Methods
by_id(event_id)
click to toggle source
# File lib/ruby_event_store/rom/repositories/events.rb, line 29 def by_id(event_id) events.map_with(:event_to_serialized_record).by_pk(event_id).one! end
count(specification)
click to toggle source
# File lib/ruby_event_store/rom/repositories/events.rb, line 60 def count(specification) query = read_scope(specification) query = query.take(specification.limit) if specification.limit? query.count end
create_changeset(serialized_records)
click to toggle source
# File lib/ruby_event_store/rom/repositories/events.rb, line 11 def create_changeset(serialized_records) events.create_changeset(serialized_records) end
exist?(event_id)
click to toggle source
# File lib/ruby_event_store/rom/repositories/events.rb, line 25 def exist?(event_id) events.by_pk(event_id).exist? end
find_nonexistent_pks(event_ids)
click to toggle source
# File lib/ruby_event_store/rom/repositories/events.rb, line 19 def find_nonexistent_pks(event_ids) return event_ids unless event_ids.any? event_ids - events.by_pk(event_ids).pluck(:id) end
last_stream_event(stream)
click to toggle source
# File lib/ruby_event_store/rom/repositories/events.rb, line 33 def last_stream_event(stream) query = stream_entries.ordered(:backward, stream) query = query_builder(query, limit: 1) query.first end
read(specification)
click to toggle source
# File lib/ruby_event_store/rom/repositories/events.rb, line 39 def read(specification) query = read_scope(specification) if specification.batched? BatchEnumerator.new( specification.batch_size, specification.limit, ->(offset, limit) { query_builder(query, offset: offset, limit: limit).to_ary } ).each else query = query_builder(query, limit: (specification.limit if specification.limit?)) if !specification.start && !specification.stop specification.first? || specification.last? ? query.first : query.each elsif specification.last? query.to_ary.last else specification.first? ? query.first : query.each end end end
update_changeset(serialized_records)
click to toggle source
# File lib/ruby_event_store/rom/repositories/events.rb, line 15 def update_changeset(serialized_records) events.update_changeset(serialized_records) end
Protected Instance Methods
query_builder(query, offset: nil, limit: nil)
click to toggle source
# File lib/ruby_event_store/rom/repositories/events.rb, line 84 def query_builder(query, offset: nil, limit: nil) query = query.offset(offset) if offset query = query.take(limit) if limit query .combine(:event) .map_with(:stream_entry_to_serialized_record, auto_struct: false) end
read_scope(specification)
click to toggle source
# File lib/ruby_event_store/rom/repositories/events.rb, line 68 def read_scope(specification) offset_entry_id = stream_entries.by_stream_and_event_id(specification.stream, specification.start).fetch(:id) if specification.start stop_entry_id = stream_entries.by_stream_and_event_id(specification.stream, specification.stop).fetch(:id) if specification.stop direction = specification.forward? ? :forward : :backward if specification.last? && !specification.start && !specification.stop direction = specification.forward? ? :backward : :forward end query = stream_entries.ordered(direction, specification.stream, offset_entry_id, stop_entry_id) query = query.by_event_id(specification.with_ids) if specification.with_ids query = query.by_event_type(specification.with_types) if specification.with_types? query end