class Alephant::Publisher::Queue::Publisher
Constants
- RECEIVE_WAIT_TIME
- VISIBILITY_TIMEOUT
Attributes
executor[R]
opts[R]
processor[R]
queue[R]
Public Class Methods
new(opts, processor = nil)
click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 12 def initialize(opts, processor = nil) @opts = opts @processor = processor @queue = Alephant::Publisher::Queue::SQSHelper::Queue.new( aws_queue, archiver, opts.queue[:visibility_timeout] || VISIBILITY_TIMEOUT, opts.queue[:receive_wait_time] || RECEIVE_WAIT_TIME ) end
Public Instance Methods
run!()
click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 24 def run! loop { processor.consume(@queue.message) } end
Private Instance Methods
archive_storage()
click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 49 def archive_storage Alephant::Storage.new( opts.writer[:s3_bucket_id], opts.writer[:s3_object_path] ) end
archiver()
click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 30 def archiver Alephant::Publisher::Queue::SQSHelper::Archiver.new(archive_storage, archiver_opts) end
archiver_opts()
click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 34 def archiver_opts options = { :async_store => true, :log_archive_message => true, :log_validator => opts.queue[:log_validator] } options.each do |key, _value| options[key] = opts.queue[key] == "true" if whitelist_key(opts.queue, key) end end
aws_queue()
click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 79 def aws_queue options = { queue_name: opts.queue[:sqs_queue_name] } options[:queue_owner_aws_account_id] = opts.queue[:aws_account_id] if opts.queue[:aws_account_id] queue_url = sqs_client.get_queue_url(options).queue_url resource = Aws::SQS::Resource.new(client: sqs_client) resource.queue(queue_url) end
get_region()
click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 56 def get_region # @TODO: Where does region come from? opts.queue[:sqs_account_region] || Aws.config[:region] || 'eu-west-1' end
sqs_client()
click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 61 def sqs_client @sqs_client ||= Aws::SQS::Client.new(sqs_queue_options) end
sqs_queue_options()
click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 65 def sqs_queue_options options = {} options[:endpoint] = ENV['AWS_SQS_ENDPOINT'] if ENV['AWS_SQS_ENDPOINT'] options[:region] = get_region logger.info( "event" => "SQSQueueOptionsConfigured", "options" => options, "method" => "#{self.class}#sqs_queue_options" ) options end
whitelist_key(options, key)
click to toggle source
# File lib/alephant/publisher/queue/publisher.rb, line 45 def whitelist_key(options, key) options.key?(key) && key != :log_validator end