module Rabbitek::Consumer
Consumer
helpers
Attributes
channel[R]
queue[R]
retry_or_delayed_exchange[R]
retry_or_delayed_queue[R]
Public Class Methods
included(base)
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 7 def self.included(base) base.extend(ClassMethods) end
new(channel, queue, retry_or_delayed_queue, retry_or_delayed_exchange)
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 13 def initialize(channel, queue, retry_or_delayed_queue, retry_or_delayed_exchange) @channel = channel @queue = queue @retry_or_delayed_queue = retry_or_delayed_queue @retry_or_delayed_exchange = retry_or_delayed_exchange end
Public Instance Methods
ack!(delivery_info, multiple = false)
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 20 def ack!(delivery_info, multiple = false) channel.ack(delivery_info.delivery_tag, multiple) end
jid()
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 44 def jid Thread.current[:rabbit_context][:job_id] end
logger()
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 28 def logger Rabbitek.logger end
nack!(delivery_info, multiple = false, requeue = true)
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 24 def nack!(delivery_info, multiple = false, requeue = true) channel.nack(delivery_info.delivery_tag, multiple, requeue) end
opts()
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 55 def opts self.class.opts end
parse_payload(payload)
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 32 def parse_payload(payload) Utils::Oj.load(payload) end
perform(_message)
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 36 def perform(_message) raise NotImplementedError end
pop_message_manually()
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 48 def pop_message_manually delivery_info, properties, payload = queue.pop(manual_ack: true) return nil unless payload Message.new(delivery_info: delivery_info, properties: properties, payload: payload) end
set_context()
click to toggle source
# File lib/rabbitek/server/consumer.rb, line 40 def set_context Thread.current[:rabbit_context] = { consumer: self.class.name, queue: @queue.name, job_id: SecureRandom.uuid } end