class Alephant::Sequencer::Sequencer
Attributes
cache[R]
ident[R]
jsonpath[R]
keep_all[R]
Public Class Methods
new(sequence_table, opts = {})
click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 10 def initialize(sequence_table, opts = {}) @sequence_table = sequence_table @cache = opts[:cache] @keep_all = opts[:keep_all] @ident = opts[:id] @exists = exists? @jsonpath = opts[:jsonpath] logger.info( 'event' => 'SequencerInitialized', 'sequenceTable' => sequence_table, 'jsonPath' => @jsonpath, 'id' => @ident, 'method' => "#{self.class}#initialize" ) end
sequence_id_from(msg, path)
click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 86 def self.sequence_id_from(msg, path) JsonPath.on(msg.body, path).first.to_i end
Public Instance Methods
delete!()
click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 56 def delete! @exists = false @sequence_table.delete_item!(ident).tap do logger.info( 'event' => 'SequenceIdDeleted', 'id' => ident, 'method' => "#{self.class}#delete!" ) end end
exists?()
click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 31 def exists? @exists || cache.get(ident) do @sequence_table.sequence_exists(ident) end end
get_last_seen(key = ident)
click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 80 def get_last_seen(key = ident) cache.get(key) do @sequence_table.sequence_for(key) end end
sequential?(msg)
click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 27 def sequential?(msg) (get_last_seen || 0) < Sequencer.sequence_id_from(msg, jsonpath) end
set_last_seen(msg, last_seen_check = nil)
click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 71 def set_last_seen(msg, last_seen_check = nil) seen_id = Sequencer.sequence_id_from(msg, jsonpath) @sequence_table.update_sequence_id( ident, seen_id, (exists? ? last_seen_check : nil) ) end
truncate!()
click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 67 def truncate! @sequence_table.truncate! end
validate(msg) { || ... }
click to toggle source
# File lib/alephant/sequencer/sequencer.rb, line 37 def validate(msg) last_seen_id = get_last_seen sequential = ((last_seen_id || 0) < Sequencer.sequence_id_from(msg, jsonpath)) yield if sequential || keep_all if sequential set_last_seen(msg, last_seen_id) else logger.metric 'SequencerNonSequentialMessageCount' logger.info( 'event' => 'NonSequentialMessageReceived', 'id' => ident, 'lastSeenId' => last_seen_id, 'method' => "#{self.class}#validate" ) end end