class SandthornSequelProjection::ProcessedEventsTracker

Constants

DEFAULT_TABLE_NAME

Attributes

db_connection[R]
event_store[R]
identifier[R]
lock[R]

Public Class Methods

migrate!(db_connection) click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 68
def self.migrate!(db_connection)
  db_connection.create_table?(table_name) do
    String    :identifier
    Integer   :last_processed_sequence_number, default: 0
    DateTime  :locked_at, null: true
    index [:identifier], unique: true
  end
rescue Exception => e
  raise MigrationError, e
end
new(identifier: required(:identifier), event_store: required(:event_store), db_connection: nil) click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 14
def initialize(identifier: required(:identifier), event_store: required(:event_store), db_connection: nil)
  @identifier = identifier.to_s
  @event_store = event_store
  @db_connection = db_connection || SandthornSequelProjection.configuration.db_connection
  @lock = Lock.new(identifier, @db_connection)
  ensure_row
end
table_name() click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 64
def self.table_name
  DEFAULT_TABLE_NAME
end

Public Instance Methods

last_processed_sequence_number() click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 42
def last_processed_sequence_number
  row[:last_processed_sequence_number]
end
process_events(&block) click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 28
def process_events(&block)
  with_lock do
    cursor = Cursor.new(after_sequence_number: last_processed_sequence_number, event_store: event_store)
    events = cursor.get_batch
    until(events.empty?)
      transaction do
        block.call(events)
        write_sequence_number(cursor.last_sequence_number)
      end
      events = cursor.get_batch
    end
  end
end
reset() click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 58
def reset
  with_lock do
    write_sequence_number(0)
  end
end
row() click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 54
def row
  table.where(identifier: identifier).first
end
row_exists?() click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 50
def row_exists?
  !row.nil?
end
table() click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 46
def table
  db_connection[table_name]
end
with_lock() { || ... } click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 22
def with_lock
  @lock.acquire do
    yield
  end
end

Private Instance Methods

create_row() click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 93
def create_row
  table.insert(identifier: identifier)
end
ensure_row() click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 89
def ensure_row
  create_row unless row_exists?
end
required(key) click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 81
def required(key)
  raise ArgumentError, "key missing: #{key}"
end
write_sequence_number(number) click to toggle source
# File lib/sandthorn_sequel_projection/processed_events_tracker.rb, line 85
def write_sequence_number(number)
  table.where(identifier: identifier).update(last_processed_sequence_number: number)
end