class EventStore::EventStream
Attributes
checkpoint_events[R]
event_table[R]
Public Class Methods
new(aggregate)
click to toggle source
# File lib/event_store/event_stream.rb, line 7 def initialize aggregate @aggregate = aggregate @id = @aggregate.id @checkpoint_events = aggregate.checkpoint_events @event_table_alias = "events" @event_table = Sequel.qualify(EventStore.schema, EventStore.table_name) @aliased_event_table = event_table.as(@event_table_alias) @names_table = EventStore.fully_qualified_names_table end
Public Instance Methods
append(raw_events, logger) { |prepared_events| ... }
click to toggle source
# File lib/event_store/event_stream.rb, line 17 def append(raw_events, logger) prepared_events = raw_events.map do |raw_event| event = prepare_event(raw_event) ensure_all_attributes_have_values!(event) event end prepared_events.each do |event| event_hash = event.dup.reject! { |k,v| k == :fully_qualified_name } event_table = insert_table(Time.now) begin id = event_table.insert(event_hash) rescue Sequel::NotNullConstraintViolation fully_qualified_names.insert(fully_qualified_name: event[:fully_qualified_name]) id = event_table.insert(event_hash) end logger.debug("EventStream#append, setting id #{id} for #{event_hash.inspect}") event[:id] = id end yield(prepared_events) if block_given? end
delete_events!()
click to toggle source
# File lib/event_store/event_stream.rb, line 160 def delete_events! EventStore.db.from(@event_table).where(:aggregate_id => @id.to_s).delete end
each() { |e| ... }
click to toggle source
# File lib/event_store/event_stream.rb, line 153 def each events.all.each do |e| e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event]) yield e end end
empty?()
click to toggle source
# File lib/event_store/event_stream.rb, line 149 def empty? events.empty? end
event_stream_between(start_time, end_time, fully_qualified_names = [])
click to toggle source
# File lib/event_store/event_stream.rb, line 139 def event_stream_between(start_time, end_time, fully_qualified_names = []) query = events.where(occurred_at: start_time..end_time) query = query.where(fully_qualified_name: fully_qualified_names) if fully_qualified_names && fully_qualified_names.any? query.all.map {|e| e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event]); e} end
events()
click to toggle source
# File lib/event_store/event_stream.rb, line 55 def events @events_query ||= begin query = EventStore.db.from(@aliased_event_table).where(:aggregate_id => @id.to_s) query = query.join(@names_table, id: :fully_qualified_name_id) if EventStore.use_names_table? query = query.order { events[:id] }.select_all(:events) query = query.select_append(:fully_qualified_name) if EventStore.use_names_table? query end end
events_from(event_id, max = nil)
click to toggle source
# File lib/event_store/event_stream.rb, line 82 def events_from(event_id, max = nil) # note: this depends on the events table being aliased to "events" above. events.limit(max).where{events[:id] >= event_id.to_i }.all.map do |event| event[:serialized_event] = EventStore.unescape_bytea(event[:serialized_event]) event end end
fully_qualified_names()
click to toggle source
# File lib/event_store/event_stream.rb, line 51 def fully_qualified_names @fully_qualified_name_query ||= EventStore.db.from(@names_table) end
insert_table(occurred_at)
click to toggle source
# File lib/event_store/event_stream.rb, line 43 def insert_table(occurred_at) EventStore.db.from(insert_table_name(occurred_at)) end
insert_table_name(date)
click to toggle source
# File lib/event_store/event_stream.rb, line 47 def insert_table_name(date) EventStore.insert_table_name(date) end
last()
click to toggle source
# File lib/event_store/event_stream.rb, line 145 def last to_a.last end
last_event_before(start_time, fully_qualified_names = [])
click to toggle source
Private: returns the last event before start_time for each of the events named
by fully_qualified_names.
Generates queries that look like this:
SELECT events.*, fully_qualified_name FROM event_store.thermostat_events "events"
INNER JOIN event_store.fully_qualified_names fqn ON fqn.id = fully_qualified_name_id
WHERE events.id IN (SELECT max(events.id) from event_store.thermostat_events "events" INNER JOIN event_store.fully_qualified_names fqn ON fqn.id = fully_qualified_name_id WHERE occurred_at < '2016-08-08 06:00:00' AND fully_qualified_name = 'faceplate_api.system.core.events.HeatingStageStarted' GROUP BY sub_key);
# File lib/event_store/event_stream.rb, line 104 def last_event_before(start_time, fully_qualified_names = []) timestampz = start_time.strftime("%Y-%m-%d %H:%M:%S%z") rows = fully_qualified_names.inject([]) { |memo, name| memo + events.where(Sequel.qualify("events", "id") => events.where(fully_qualified_name: name).where { occurred_at < timestampz } .select { max(events[:id]) }.unordered.group(:sub_key)).all }.sort_by { |r| r[:occurred_at] } rows.map {|r| r[:serialized_event] = EventStore.unescape_bytea(r[:serialized_event]); r} end
simple_last_event_before(start_time, fully_qualified_names = [])
click to toggle source
Private: returns the last event before start_time for each of the events named
by fully_qualified_names. Doesn't work when events have multiple valid sub_keys, but is fast when they don't.
Generates queries that look like this:
SELECT events.*, fully_qualified_name FROM event_store.thermostat_events "events"
INNER JOIN event_store.fully_qualified_names fqn ON fqn.id = fully_qualified_name_id
WHERE occurred_at < '2016-08-08 06:00:00' AND fully_qualified_name = 'faceplate_api.system.core.events.HeatingStageStarted' ORDER BY occurred_at DESC LIMIT 1;
# File lib/event_store/event_stream.rb, line 128 def simple_last_event_before(start_time, fully_qualified_names = []) timestampz = start_time.strftime("%Y-%m-%d %H:%M:%S%z") rows = fully_qualified_names.inject([]) { |memo, name| memo + events.where(fully_qualified_name: name).where{ occurred_at < timestampz } .reverse_order(:occurred_at, :id).limit(1).all }.sort_by { |r| r[:occurred_at] } rows.map {|r| r[:serialized_event] = EventStore.unescape_bytea(r[:serialized_event]); r} end
snapshot_events()
click to toggle source
# File lib/event_store/event_stream.rb, line 66 def snapshot_events last_checkpoint = nil if checkpoint_events checkpoints = last_event_before(Time.now.utc, checkpoint_events) last_checkpoint = checkpoints.first # start at the earliest possible place end if last_checkpoint events.where{ events[:id] >= last_checkpoint[:id].to_i } else events end end
Private Instance Methods
ensure_all_attributes_have_values!(event_hash)
click to toggle source
# File lib/event_store/event_stream.rb, line 181 def ensure_all_attributes_have_values!(event_hash) [:aggregate_id, :fully_qualified_name, :occurred_at, :serialized_event].each do |attribute_name| if event_hash[attribute_name].to_s.strip.empty? raise AttributeMissingError, "value required for #{attribute_name}" end end end
prepare_event(raw_event)
click to toggle source
# File lib/event_store/event_stream.rb, line 166 def prepare_event(raw_event) raise ArgumentError.new("Cannot Append a Nil Event") unless raw_event { :aggregate_id => raw_event.aggregate_id, :occurred_at => Time.parse(raw_event.occurred_at.to_s).utc, #to_s truncates microseconds, which breaks Time equality :serialized_event => EventStore.escape_bytea(raw_event.serialized_event), :fully_qualified_name => raw_event.fully_qualified_name, :sub_key => raw_event.sub_key }.tap { |event_info| if EventStore.use_names_table? name_subquery = EventStore.db.from(@names_table).where(fully_qualified_name: raw_event.fully_qualified_name).select(:id) event_info[:fully_qualified_name_id] = name_subquery end } end