class EventStore::Snapshot
Attributes
snapshot_event_id_table[R]
snapshot_table[R]
Public Class Methods
new(aggregate)
click to toggle source
# File lib/event_store/snapshot.rb, line 9 def initialize aggregate @aggregate = aggregate @redis = EventStore.redis(aggregate.id) @snapshot_table = "#{@aggregate.type}_snapshots_for_#{@aggregate.id}" @snapshot_event_id_table = "#{@aggregate.type}_snapshot_event_ids_for_#{@aggregate.id}" end
Public Instance Methods
count(logger = default_logger)
click to toggle source
# File lib/event_store/snapshot.rb, line 32 def count(logger = default_logger) read_with_rebuild(logger).count end
delete_snapshot!()
click to toggle source
# File lib/event_store/snapshot.rb, line 67 def delete_snapshot! @redis.del [snapshot_table, snapshot_event_id_table] end
each(logger=default_logger) { |e| ... }
click to toggle source
# File lib/event_store/snapshot.rb, line 36 def each(logger=default_logger) logger.info { "#{self.class.name}#each for #{@aggregate.id}" } t = Time.now events_hash = read_with_rebuild(logger) logger.debug { "#{self.class.name}#auto_rebuild_snapshot took #{Time.now - t} seconds for #{@aggregate.id}" } t = Time.now result_hash = events_hash.inject([]) do |snapshot, (key, value)| snapshot + [serialized_event_from_snapshot_event(key, value)] end logger.debug { "#{self.class.name} serializing events took #{Time.now - t} seconds" } result_hash.sort_by(&:event_id).each { |e| yield e } end
event_id(snapshot_key =:current_event_id)
click to toggle source
# File lib/event_store/snapshot.rb, line 24 def event_id(snapshot_key =:current_event_id) (@redis.hget(snapshot_event_id_table, snapshot_key) || -1).to_i end
event_id_for(fully_qualified_name, sub_key = nil)
click to toggle source
# File lib/event_store/snapshot.rb, line 28 def event_id_for(fully_qualified_name, sub_key = nil) event_id(snapshot_key(fully_qualified_name: fully_qualified_name, sub_key: sub_key)) end
exists?()
click to toggle source
# File lib/event_store/snapshot.rb, line 16 def exists? @redis.exists(snapshot_table) end
last_event()
click to toggle source
# File lib/event_store/snapshot.rb, line 20 def last_event to_a.last end
rebuild_snapshot!(logger=default_logger)
click to toggle source
# File lib/event_store/snapshot.rb, line 50 def rebuild_snapshot!(logger=default_logger) logger.info { "#{self.class.name}#rebuild_snapshot!" } t = Time.now delete_snapshot! logger.debug { "Deleting snapshot took #{Time.now - t} seconds" } t = Time.now all_events = @aggregate.snapshot_events.all logger.debug { "getting #{all_events.count} events" } logger.debug { "getting all events took #{Time.now - t} seconds" } t = Time.now corrected_events = all_events.map{|e| e[:occurred_at] = TimeHacker.translate_occurred_at_from_local_to_gmt(e[:occurred_at]); e} logger.debug { "correcting occurred_at on all events took #{Time.now - t} seconds" } t = Time.now store_snapshot(corrected_events) logger.debug { "storing new snapshot took #{Time.now - t} seconds" } end
reject!(logger = default_logger) { |serialized_event| ... }
click to toggle source
# File lib/event_store/snapshot.rb, line 109 def reject!(logger = default_logger) events_to_keep = read_with_rebuild(logger) event_ids = current_event_id_numbers events_to_keep.dup.each { |snapshot_key, snapshot_event| serialized_event = serialized_event_from_snapshot_event(snapshot_key, snapshot_event) drop_it = yield serialized_event if drop_it events_to_keep.delete(snapshot_key) event_ids.delete(snapshot_key) end } replace_snapshot_tables(event_ids.flatten, events_to_keep.flatten) end
store_snapshot(prepared_events, logger=default_logger)
click to toggle source
# File lib/event_store/snapshot.rb, line 71 def store_snapshot(prepared_events, logger=default_logger) valid_snapshot_events = [] valid_snapshot_event_ids = [] prepared_events.each do |event_hash| key = snapshot_key(event_hash) current_id = current_event_id_numbers[key].to_i logger.debug("Snapshot#store_snapshot: snapshot_key: #{key} prepared id: #{event_hash[:id]}, current id: #{current_id}") if event_hash[:id].to_i > current_id logger.debug("prepared event is newer, storing") valid_snapshot_events += snapshot_event(event_hash) valid_snapshot_event_ids += snapshot_event_id(event_hash) end end logger.debug("valid_snapshot_event_ids: #{valid_snapshot_event_ids.inspect}") if valid_snapshot_event_ids.any? logger.debug("there are valid_snapshot_event_ids, persisting to redis") valid_snapshot_event_ids += [:current_event_id, valid_snapshot_event_ids.last.to_i] update_snapshot_tables(valid_snapshot_event_ids, valid_snapshot_events) end end
update_fqns!(logger = default_logger) { |fqn| ... }
click to toggle source
# File lib/event_store/snapshot.rb, line 96 def update_fqns!(logger = default_logger) logger.debug { "Replacing FQNs in snapshot events" } updated_events = replace_fqns_in_snapshot_hash(read_with_rebuild(logger)) { |fqn| yield fqn } logger.debug { "Replacing FQNs in snapshot event ids" } updated_event_ids = replace_fqns_in_snapshot_hash(current_event_id_numbers) { |fqn| fqn == 'current_event_id' ? fqn : (yield fqn) } logger.debug "Updating snapshot tables in redis" replace_snapshot_tables(updated_event_ids, updated_events) end
Private Instance Methods
auto_rebuild_snapshot(events_hash, logger=default_logger)
click to toggle source
# File lib/event_store/snapshot.rb, line 204 def auto_rebuild_snapshot(events_hash, logger=default_logger) logger.info { "#{self.class.name}#auto_rebuild_snapshot(#{events_hash.count} events)" } return events_hash unless events_hash.empty? #got it? return it t = Time.now logger.debug { "#{self.class.name} about to query db to see if anything is there" } event_count = @aggregate.events.count logger.debug { "#{self.class.name} took #{Time.now - t} seconds query db for event count" } return events_hash if event_count == 0 #return nil if no events in the ES # so there are events in the ES but there is no redis snapshot rebuild_snapshot!(logger) events_hash = read_raw_snapshot(logger) end
current_event_id_numbers()
click to toggle source
# File lib/event_store/snapshot.rb, line 191 def current_event_id_numbers @redis.hgetall(snapshot_event_id_table).tap { |event_ids| event_ids.default = -1 } end
default_logger()
click to toggle source
# File lib/event_store/snapshot.rb, line 166 def default_logger Logger.new('/dev/null') end
read_raw_snapshot(logger=default_logger)
click to toggle source
# File lib/event_store/snapshot.rb, line 197 def read_raw_snapshot(logger=default_logger) t = Time.now @redis.hgetall(snapshot_table).tap { |_snapshot| logger.debug { "#{self.class.name}#read_raw_snapshot took #{Time.now - t} seconds" } } end
read_with_rebuild(logger = default_logger)
click to toggle source
# File lib/event_store/snapshot.rb, line 219 def read_with_rebuild(logger = default_logger) auto_rebuild_snapshot(read_raw_snapshot(logger), logger) end
replace_fqn_in_snapshot_key(key) { |fqn) || fqn| ... }
click to toggle source
# File lib/event_store/snapshot.rb, line 145 def replace_fqn_in_snapshot_key(key) fqn, sub_key = key.split(EventStore::SNAPSHOT_KEY_DELIMITER) new_fqn = (yield fqn) || fqn [new_fqn, sub_key].compact.join(EventStore::SNAPSHOT_KEY_DELIMITER) end
replace_fqns_in_snapshot_hash(snapshot_keyed_hash) { |fqn| ... }
click to toggle source
# File lib/event_store/snapshot.rb, line 138 def replace_fqns_in_snapshot_hash(snapshot_keyed_hash) snapshot_keyed_hash.inject([]) { |memo, (snapshot_key, value)| new_key = replace_fqn_in_snapshot_key(snapshot_key) { |fqn| yield fqn } memo + [new_key, value] } end
replace_snapshot_tables(event_ids_array, events_array)
click to toggle source
# File lib/event_store/snapshot.rb, line 151 def replace_snapshot_tables(event_ids_array, events_array) @redis.multi do delete_snapshot! update_snapshot_tables(event_ids_array, events_array) end end
serialized_event_from_snapshot_event(snapshot_key, snapshot_value)
click to toggle source
# File lib/event_store/snapshot.rb, line 128 def serialized_event_from_snapshot_event(snapshot_key, snapshot_value) fully_qualified_name, _ = snapshot_key.split(EventStore::SNAPSHOT_KEY_DELIMITER) raw_event = snapshot_value.split(EventStore::SNAPSHOT_DELIMITER) event_id = raw_event.first.to_i serialized_event = EventStore.unescape_bytea(raw_event[1]) occurred_at = Time.parse(raw_event.last) SerializedEvent.new(fully_qualified_name, serialized_event, event_id, occurred_at) end
snapshot_event(event)
click to toggle source
# File lib/event_store/snapshot.rb, line 174 def snapshot_event(event) [ snapshot_key(event), [ event[:id].to_s, event[:serialized_event], event[:occurred_at].to_s ].join(EventStore::SNAPSHOT_DELIMITER) ] end
snapshot_event_id(event)
click to toggle source
# File lib/event_store/snapshot.rb, line 184 def snapshot_event_id(event) [ snapshot_key(event), event[:id] ] end
snapshot_key(event)
click to toggle source
# File lib/event_store/snapshot.rb, line 170 def snapshot_key(event) [event[:fully_qualified_name], event[:sub_key] || EventStore::NO_SUB_KEY].join(EventStore::SNAPSHOT_KEY_DELIMITER) end
update_snapshot_tables(event_ids_array, events_array)
click to toggle source
params are flattened hashes, i.e., [key1, val1, key2, val2, …]
# File lib/event_store/snapshot.rb, line 159 def update_snapshot_tables(event_ids_array, events_array) return unless events_array.any? @redis.hmset(snapshot_event_id_table, event_ids_array) @redis.hmset(snapshot_table, events_array) end