class Alephant::Sequencer::Sequencer

Attributes

cache[R]
ident[R]
jsonpath[R]
keep_all[R]

Public Class Methods

new(sequence_table, opts = {}) click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 10
def initialize(sequence_table, opts = {})
  @sequence_table = sequence_table

  @cache    = opts[:cache]
  @keep_all = opts[:keep_all]
  @ident    = opts[:id]
  @exists   = exists?
  @jsonpath = opts[:jsonpath]
  logger.info(
    'event'         => 'SequencerInitialized',
    'sequenceTable' => sequence_table,
    'jsonPath'      => @jsonpath,
    'id'            => @ident,
    'method'        => "#{self.class}#initialize"
  )
end
sequence_id_from(msg, path) click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 86
def self.sequence_id_from(msg, path)
  JsonPath.on(msg.body, path).first.to_i
end

Public Instance Methods

delete!() click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 56
def delete!
  @exists = false
  @sequence_table.delete_item!(ident).tap do
    logger.info(
      'event'  => 'SequenceIdDeleted',
      'id'     => ident,
      'method' => "#{self.class}#delete!"
    )
  end
end
exists?() click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 31
def exists?
  @exists || cache.get(ident) do
    @sequence_table.sequence_exists(ident)
  end
end
get_last_seen(key = ident) click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 80
def get_last_seen(key = ident)
  cache.get(key) do
    @sequence_table.sequence_for(key)
  end
end
sequential?(msg) click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 27
def sequential?(msg)
  (get_last_seen || 0) < Sequencer.sequence_id_from(msg, jsonpath)
end
set_last_seen(msg, last_seen_check = nil) click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 71
def set_last_seen(msg, last_seen_check = nil)
  seen_id = Sequencer.sequence_id_from(msg, jsonpath)

  @sequence_table.update_sequence_id(
    ident, seen_id,
    (exists? ? last_seen_check : nil)
  )
end
truncate!() click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 67
def truncate!
  @sequence_table.truncate!
end
validate(msg) { || ... } click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 37
def validate(msg)
  last_seen_id = get_last_seen
  sequential = ((last_seen_id || 0) < Sequencer.sequence_id_from(msg, jsonpath))

  yield if sequential || keep_all

  if sequential
    set_last_seen(msg, last_seen_id)
  else
    logger.metric 'SequencerNonSequentialMessageCount'
    logger.info(
      'event'      => 'NonSequentialMessageReceived',
      'id'         => ident,
      'lastSeenId' => last_seen_id,
      'method'     => "#{self.class}#validate"
    )
  end
end