class Alephant::Publisher::Queue::SQSHelper::Archiver

Attributes

async[R]
log_message_body[R]
log_validator[R]
storage[R]

Public Class Methods

new(storage, opts) click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 14
def initialize(storage, opts)
  @storage          = storage
  @async            = opts[:async_store]
  @log_message_body = opts[:log_archive_message]
  @log_validator    = opts[:log_validator] || -> _ { true }
end

Public Instance Methods

see(message) click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 21
def see(message)
  return if message.nil?
  message.tap do |m|
    async ? async_store(m) : sync_store(m)
  end
end

Private Instance Methods

async_store(message) click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 30
def async_store(message)
  Thread.new do
    logger.metric "AsynchronouslyArchivedData"
    store message
  end
end
body_for(message) click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 72
def body_for(message)
  log_message_body ? message.body : '{ "Message": "No message body available" }'
end
date_key() click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 76
def date_key
  DateTime.now.strftime("%d-%m-%Y_%H")
end
log_message_parts(id) click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 65
def log_message_parts(id)
  [
    "#{self.class}#store:",
    "'#archive/#{date_key}/#{id}'"
  ]
end
meta_for(m) click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 80
def meta_for(m)
  {
    :id        => m.id,
    :md5       => m.md5,
    :logged_at => DateTime.now.to_s,
    :queue     => m.queue.url
  }
end
storage_key(id) click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 61
def storage_key(id)
  "archive/#{date_key}/#{id}"
end
store(message) click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 42
def store(message)
  msg_body = body_for(message)
  store_item(message).tap do
    logger.info(
      "event"       => "MessageStored",
      "messageBody" => msg_body,
      "method"      => "#{self.class}#store"
    ) if log_validator.call(msg_body)
  end
end
store_item(message) click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 53
def store_item(message)
  storage.put(
    storage_key(message.id),
    message.body,
    meta_for(message)
  )
end
sync_store(message) click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 37
def sync_store(message)
  logger.metric "SynchronouslyArchivedData"
  store message
end