class Osbourne::WorkerBase

Attributes

config[RW]
queue[RW]
subscriptions[RW]
topics[RW]

Public Class Methods

dead_letter_queue() click to toggle source
# File lib/osbourne/worker_base.rb, line 39
def dead_letter_queue
  return unless config[:dead_letter]

  @dead_letter_queue ||= Queue.new(dead_letter_queue_name)
end
descendants() click to toggle source
# File lib/osbourne/worker_base.rb, line 30
def descendants
  ObjectSpace.each_object(Class).select {|klass| klass < self }
end
polling_queue() click to toggle source
# File lib/osbourne/worker_base.rb, line 45
def polling_queue
  Aws::SQS::QueuePoller.new(queue.url, client: Osbourne.sqs_client)
end
provision() click to toggle source
# File lib/osbourne/worker_base.rb, line 34
def provision
  register
  register_dead_letter_queue
end

Private Class Methods

dead_letter_queue_name() click to toggle source
# File lib/osbourne/worker_base.rb, line 71
def dead_letter_queue_name
  "#{config[:queue_name]}-dead-letter"
end
default_queue_name() click to toggle source
# File lib/osbourne/worker_base.rb, line 67
def default_queue_name
  "#{name.underscore}_queue"
end
register() click to toggle source
# File lib/osbourne/worker_base.rb, line 60
def register
  Osbourne.logger.info "[Osbourne] #{self.class.name} subscriptions: Topics: [#{config[:topic_names].join(', ')}], Queue: [#{config[:queue_name]}]"
  self.topics = config[:topic_names].map {|tn| Topic.new(tn) }
  self.queue = Queue.new(config[:queue_name])
  self.subscriptions = Subscription.new(topics, queue)
end
register_dead_letter_queue() click to toggle source
# File lib/osbourne/worker_base.rb, line 53
def register_dead_letter_queue
  return unless config[:dead_letter]

  Osbourne.logger.info "[Osbourne] #{self.class.name} dead letter queue: arn: [#{dead_letter_queue.arn}], max retries: #{config[:max_retry_count]}"
  queue.redrive(config[:max_retry_count], dead_letter_queue.arn)
end
worker_config(topics: [], max_batch_size: 10, max_wait: 10, threads: Osbourne.threads_per_worker, queue_name: default_queue_name, dead_letter_queue: true, idle_timeout: 600, max_retry_count: Osbourne.max_retry_count) click to toggle source

rubocop:disable Metrics/ParameterLists

# File lib/osbourne/worker_base.rb, line 76
def worker_config(topics: [],
                  max_batch_size: 10,
                  max_wait: 10,
                  threads: Osbourne.threads_per_worker,
                  queue_name: default_queue_name,
                  dead_letter_queue: true,
                  idle_timeout: 600,
                  max_retry_count: Osbourne.max_retry_count)
  self.config = {
    topic_names:     Array(topics),
    queue_name:      queue_name,
    max_batch_size:  max_batch_size,
    max_wait:        max_wait,
    threads:         threads,
    dead_letter:     dead_letter_queue,
    max_retry_count: max_retry_count,
    idle_timeout:    idle_timeout
  }
end

Public Instance Methods

config() click to toggle source
# File lib/osbourne/worker_base.rb, line 7
def config
  self.class.config
end
config=(config) click to toggle source
# File lib/osbourne/worker_base.rb, line 15
def config=(config)
  self.class.config = config
end
polling_queue() click to toggle source
# File lib/osbourne/worker_base.rb, line 23
def polling_queue
  self.class.polling_queue
end
process(_message) click to toggle source
# File lib/osbourne/worker_base.rb, line 11
def process(_message)
  raise NotImplementedError, "#{self} must implement class method `process`"
end
queue() click to toggle source
# File lib/osbourne/worker_base.rb, line 19
def queue
  self.class.queue
end