class Cuniculus::Consumer

Constants

JOB_REQUIRED_KEYS
POLL_TIME

Attributes

channel[R]
exchange[R]
job_queue[R]
queue_config[R]

Public Class Methods

new(queue_config, channel) click to toggle source
   # File lib/cuniculus/consumer.rb
13 def initialize(queue_config, channel)
14   @channel = channel
15   @queue_config = queue_config
16 end

Public Instance Methods

constantize(str) click to toggle source
   # File lib/cuniculus/consumer.rb
80 def constantize(str)
81   return Object.const_get(str) unless str.include?("::")
82 
83   names = str.split("::")
84   names.shift if names.empty? || names.first.empty?
85 
86   names.inject(Object) do |constant, name|
87     constant.const_get(name, false)
88   end
89 end
handle_error(e) click to toggle source
   # File lib/cuniculus/consumer.rb
75 def handle_error(e)
76   Cuniculus.logger.error("#{e.class.name}: #{e.message}")
77   Cuniculus.logger.error(e.backtrace.join("\n")) unless e.backtrace.nil?
78 end
maybe_retry(delivery_info, item) click to toggle source
   # File lib/cuniculus/consumer.rb
57 def maybe_retry(delivery_info, item)
58   retry_count = item["_cun_retries"].to_i
59   retry_queue_name = job_queue.retry_queue(retry_count)
60   unless retry_queue_name
61     channel.nack(delivery_info.delivery_tag, false, false)
62     return
63   end
64   payload = Cuniculus.dump_job(item.merge("_cun_retries" => retry_count + 1))
65   exchange.publish(
66     payload,
67     {
68       routing_key: retry_queue_name,
69       persistent: true
70     }
71   )
72   channel.ack(delivery_info.delivery_tag, false)
73 end
parse_job(payload) click to toggle source
   # File lib/cuniculus/consumer.rb
46 def parse_job(payload)
47   msg = Cuniculus.load_job(payload)
48   raise Cuniculus::BadlyFormattedPayload, "Consumed message with missing information: #{payload}\nIt should have keys [#{JOB_REQUIRED_KEYS.join(', ')}]" unless (JOB_REQUIRED_KEYS - msg.keys).empty?
49 
50   msg
51 rescue Cuniculus::BadlyFormattedPayload
52   raise
53 rescue StandardError => ex
54   raise Cuniculus.convert_exception_class(ex, Cuniculus::BadlyFormattedPayload), "Badly formatted consumed message: #{payload}"
55 end
run_job(delivery_info, _properties, payload) click to toggle source
   # File lib/cuniculus/consumer.rb
31 def run_job(delivery_info, _properties, payload)
32   item = parse_job(payload)
33   klass = Object.const_get(item["class"])
34   worker = klass.new
35   worker.perform(*item["args"])
36   channel.ack(delivery_info.delivery_tag, false)
37 rescue Cuniculus::BadlyFormattedPayload => ex
38   handle_error(ex)
39   # If parse failed, send message straight to DLX
40   channel.nack(delivery_info.delivery_tag, false, false)
41 rescue StandardError => ex
42   handle_error(Cuniculus.convert_exception_class(ex, Cuniculus::Error))
43   maybe_retry(delivery_info, item)
44 end
start() click to toggle source
   # File lib/cuniculus/consumer.rb
18 def start
19   @exchange = channel.direct(Cuniculus::CUNICULUS_EXCHANGE, { durable: true })
20   @job_queue = queue_config.declare!(channel)
21   @_consumer = job_queue.subscribe(manual_ack: true, block: false) do |delivery_info, properties, payload|
22     run_job(delivery_info, properties, payload)
23   end
24 end
stop() click to toggle source
   # File lib/cuniculus/consumer.rb
26 def stop
27   @_consumer&.cancel
28   channel.close unless channel.closed?
29 end