class Osbourne::WorkerBase
Attributes
config[RW]
queue[RW]
subscriptions[RW]
topics[RW]
Public Class Methods
dead_letter_queue()
click to toggle source
# File lib/osbourne/worker_base.rb, line 39 def dead_letter_queue return unless config[:dead_letter] @dead_letter_queue ||= Queue.new(dead_letter_queue_name) end
descendants()
click to toggle source
# File lib/osbourne/worker_base.rb, line 30 def descendants ObjectSpace.each_object(Class).select {|klass| klass < self } end
polling_queue()
click to toggle source
# File lib/osbourne/worker_base.rb, line 45 def polling_queue Aws::SQS::QueuePoller.new(queue.url, client: Osbourne.sqs_client) end
provision()
click to toggle source
# File lib/osbourne/worker_base.rb, line 34 def provision register register_dead_letter_queue end
Private Class Methods
dead_letter_queue_name()
click to toggle source
# File lib/osbourne/worker_base.rb, line 71 def dead_letter_queue_name "#{config[:queue_name]}-dead-letter" end
default_queue_name()
click to toggle source
# File lib/osbourne/worker_base.rb, line 67 def default_queue_name "#{name.underscore}_queue" end
register()
click to toggle source
# File lib/osbourne/worker_base.rb, line 60 def register Osbourne.logger.info "[Osbourne] #{self.class.name} subscriptions: Topics: [#{config[:topic_names].join(', ')}], Queue: [#{config[:queue_name]}]" self.topics = config[:topic_names].map {|tn| Topic.new(tn) } self.queue = Queue.new(config[:queue_name]) self.subscriptions = Subscription.new(topics, queue) end
register_dead_letter_queue()
click to toggle source
# File lib/osbourne/worker_base.rb, line 53 def register_dead_letter_queue return unless config[:dead_letter] Osbourne.logger.info "[Osbourne] #{self.class.name} dead letter queue: arn: [#{dead_letter_queue.arn}], max retries: #{config[:max_retry_count]}" queue.redrive(config[:max_retry_count], dead_letter_queue.arn) end
worker_config(topics: [], max_batch_size: 10, max_wait: 10, threads: Osbourne.threads_per_worker, queue_name: default_queue_name, dead_letter_queue: true, idle_timeout: 600, max_retry_count: Osbourne.max_retry_count)
click to toggle source
rubocop:disable Metrics/ParameterLists
# File lib/osbourne/worker_base.rb, line 76 def worker_config(topics: [], max_batch_size: 10, max_wait: 10, threads: Osbourne.threads_per_worker, queue_name: default_queue_name, dead_letter_queue: true, idle_timeout: 600, max_retry_count: Osbourne.max_retry_count) self.config = { topic_names: Array(topics), queue_name: queue_name, max_batch_size: max_batch_size, max_wait: max_wait, threads: threads, dead_letter: dead_letter_queue, max_retry_count: max_retry_count, idle_timeout: idle_timeout } end
Public Instance Methods
config()
click to toggle source
# File lib/osbourne/worker_base.rb, line 7 def config self.class.config end
config=(config)
click to toggle source
# File lib/osbourne/worker_base.rb, line 15 def config=(config) self.class.config = config end
polling_queue()
click to toggle source
# File lib/osbourne/worker_base.rb, line 23 def polling_queue self.class.polling_queue end
process(_message)
click to toggle source
# File lib/osbourne/worker_base.rb, line 11 def process(_message) raise NotImplementedError, "#{self} must implement class method `process`" end
queue()
click to toggle source
# File lib/osbourne/worker_base.rb, line 19 def queue self.class.queue end