class SandthornSequelProjection::ProcessedEventsTracker
Constants
- DEFAULT_TABLE_NAME
Attributes
db_connection[R]
event_store[R]
identifier[R]
lock[R]
Public Class Methods
migrate!(db_connection)
click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 68 def self.migrate!(db_connection) db_connection.create_table?(table_name) do String :identifier Integer :last_processed_sequence_number, default: 0 DateTime :locked_at, null: true index [:identifier], unique: true end rescue Exception => e raise MigrationError, e end
new(identifier: required(:identifier), event_store: required(:event_store), db_connection: nil)
click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 14 def initialize(identifier: required(:identifier), event_store: required(:event_store), db_connection: nil) @identifier = identifier.to_s @event_store = event_store @db_connection = db_connection || SandthornSequelProjection.configuration.db_connection @lock = Lock.new(identifier, @db_connection) ensure_row end
table_name()
click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 64 def self.table_name DEFAULT_TABLE_NAME end
Public Instance Methods
last_processed_sequence_number()
click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 42 def last_processed_sequence_number row[:last_processed_sequence_number] end
process_events(&block)
click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 28 def process_events(&block) with_lock do cursor = Cursor.new(after_sequence_number: last_processed_sequence_number, event_store: event_store) events = cursor.get_batch until(events.empty?) transaction do block.call(events) write_sequence_number(cursor.last_sequence_number) end events = cursor.get_batch end end end
reset()
click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 58 def reset with_lock do write_sequence_number(0) end end
row()
click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 54 def row table.where(identifier: identifier).first end
row_exists?()
click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 50 def row_exists? !row.nil? end
table()
click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 46 def table db_connection[table_name] end
with_lock() { || ... }
click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 22 def with_lock @lock.acquire do yield end end
Private Instance Methods
create_row()
click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 93 def create_row table.insert(identifier: identifier) end
ensure_row()
click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 89 def ensure_row create_row unless row_exists? end
required(key)
click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 81 def required(key) raise ArgumentError, "key missing: #{key}" end
write_sequence_number(number)
click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 85 def write_sequence_number(number) table.where(identifier: identifier).update(last_processed_sequence_number: number) end