class Moe::Sequence::Collector

Attributes

dyna[RW]
flushed_count[RW]
owner_id[R]
payloads[RW]
uuid[R]
write_tables[R]

Public Class Methods

new(name, owner_id) click to toggle source
# File lib/moe/sequence/collector.rb, line 7
def initialize(name, owner_id)
  @dyna          = Dyna.new
  @flushed_count = 0
  @payloads      = []
  @owner_id      = owner_id
  @uuid          = SecureRandom.uuid
  @write_tables  = Moe.config.tables[name].last
end

Public Instance Methods

add(payload={}) click to toggle source
# File lib/moe/sequence/collector.rb, line 16
def add(payload={})
  payloads << payload

  if payloads.size >= Moe.config.batch_limit
    items = keyify payloads
    flush items

    self.payloads = []
  end
end
save(payload={}) click to toggle source
# File lib/moe/sequence/collector.rb, line 27
def save(payload={})
  metadata_item = {
    "count"    => (payloads.size + flushed_count).to_s,
    "saved_at" => Time.now.to_s,
    "payload"  => MultiJson.dump(payload)
  }.merge Locksmith.itemize owner_id, payload, 0, uuid

  items = keyify payloads

  items << metadata_item

  flush items
end

Private Instance Methods

flush(items) click to toggle source
# File lib/moe/sequence/collector.rb, line 43
def flush(items)
  result = dyna.batch_write_item write_tables, items

  self.flushed_count += items.size
end
keyify(items, uid=uuid) click to toggle source
# File lib/moe/sequence/collector.rb, line 49
def keyify(items, uid=uuid)
  count = flushed_count
  items.each do |item|
    count += 1
    item.update Locksmith.key owner_id, count, uid
  end
end