class Alephant::Publisher::Queue::Publisher

Constants

RECEIVE_WAIT_TIME
VISIBILITY_TIMEOUT

Attributes

executor[R]
opts[R]
processor[R]
queue[R]

Public Class Methods

new(opts, processor = nil) click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 12
def initialize(opts, processor = nil)
  @opts = opts
  @processor = processor

  @queue = Alephant::Publisher::Queue::SQSHelper::Queue.new(
    aws_queue,
    archiver,
    opts.queue[:visibility_timeout] || VISIBILITY_TIMEOUT,
    opts.queue[:receive_wait_time]  || RECEIVE_WAIT_TIME
  )
end

Public Instance Methods

run!() click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 24
def run!
  loop { processor.consume(@queue.message) }
end

Private Instance Methods

archive_storage() click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 49
def archive_storage
  Alephant::Storage.new(
    opts.writer[:s3_bucket_id],
    opts.writer[:s3_object_path]
  )
end
archiver() click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 30
def archiver
  Alephant::Publisher::Queue::SQSHelper::Archiver.new(archive_storage, archiver_opts)
end
archiver_opts() click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 34
def archiver_opts
  options = {
    :async_store         => true,
    :log_archive_message => true,
    :log_validator       => opts.queue[:log_validator]
  }
  options.each do |key, _value|
    options[key] = opts.queue[key] == "true" if whitelist_key(opts.queue, key)
  end
end
aws_queue() click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 79
def aws_queue
  options = { queue_name: opts.queue[:sqs_queue_name] }
  options[:queue_owner_aws_account_id] = opts.queue[:aws_account_id] if opts.queue[:aws_account_id]

  queue_url = sqs_client.get_queue_url(options).queue_url

  resource = Aws::SQS::Resource.new(client: sqs_client)
  resource.queue(queue_url)
end
get_region() click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 56
def get_region
  # @TODO: Where does region come from?
  opts.queue[:sqs_account_region] || Aws.config[:region] || 'eu-west-1'
end
sqs_client() click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 61
def sqs_client
  @sqs_client ||= Aws::SQS::Client.new(sqs_queue_options)
end
sqs_queue_options() click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 65
def sqs_queue_options
  options = {}
  options[:endpoint] = ENV['AWS_SQS_ENDPOINT'] if ENV['AWS_SQS_ENDPOINT']
  options[:region]   = get_region

  logger.info(
    "event"   => "SQSQueueOptionsConfigured",
    "options" => options,
    "method"  => "#{self.class}#sqs_queue_options"
  )

  options
end
whitelist_key(options, key) click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 45
def whitelist_key(options, key)
  options.key?(key) && key != :log_validator
end