class Alephant::Publisher::Queue::SQSHelper::Queue

Constants

VISABILITY_TIMEOUT
WAIT_TIME

Attributes

archiver[R]
queue[R]
timeout[R]
wait_time[R]

Public Class Methods

new( queue, archiver = nil, timeout = VISABILITY_TIMEOUT, wait_time = WAIT_TIME ) click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/queue.rb, line 16
def initialize(
  queue,
  archiver  = nil,
  timeout   = VISABILITY_TIMEOUT,
  wait_time = WAIT_TIME
)
  @queue     = queue
  @archiver  = archiver
  @timeout   = timeout
  @wait_time = wait_time
  log_queue_creation queue.url, archiver, timeout
end

Public Instance Methods

message() click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/queue.rb, line 29
def message
  receive.tap { |m| process(m) }
end

Private Instance Methods

archive(m) click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/queue.rb, line 58
def archive(m)
  archiver.see(m) unless archiver.nil?
rescue StandardError => e
  logger.metric "ArchiveFailed"
  logger.error(
    "event"     => "MessageArchiveFailed",
    "class"     => e.class,
    "message"   => e.message,
    "backtrace" => e.backtrace.join.to_s,
    "method"    => "#{self.class}#archive"
  )
end
log_queue_creation(queue_url, archiver, timeout) click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/queue.rb, line 35
def log_queue_creation(queue_url, archiver, timeout)
  logger.info(
    "event"    => "QueueConfigured",
    "queueUrl" => queue_url,
    "archiver" => archiver,
    "timeout"  => timeout,
    "method"   => "#{self.class}#initialize"
  )
end
process(m) click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/queue.rb, line 45
def process(m)
  return unless m.size > 0

  logger.metric "MessagesReceived"
  logger.info(
    "event"     => "QueueMessageReceived",
    "messageId" => m.first.message_id,
    "method"    => "#{self.class}#process"
  )
  # @TODO: Look at archiver as should support message from collection.
  archive m.first
end
receive() click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/queue.rb, line 71
def receive
  queue.receive_messages(
    :visibility_timeout     => timeout,
    :wait_time_seconds      => wait_time,
    :max_number_of_messages => 1
  )
end