module Rabbitek::Consumer

Consumer helpers

Attributes

channel[R]
queue[R]
retry_or_delayed_exchange[R]
retry_or_delayed_queue[R]

Public Class Methods

included(base) click to toggle source
# File lib/rabbitek/server/consumer.rb, line 7
def self.included(base)
  base.extend(ClassMethods)
end
new(channel, queue, retry_or_delayed_queue, retry_or_delayed_exchange) click to toggle source
# File lib/rabbitek/server/consumer.rb, line 13
def initialize(channel, queue, retry_or_delayed_queue, retry_or_delayed_exchange)
  @channel = channel
  @queue = queue
  @retry_or_delayed_queue = retry_or_delayed_queue
  @retry_or_delayed_exchange = retry_or_delayed_exchange
end

Public Instance Methods

ack!(delivery_info, multiple = false) click to toggle source
# File lib/rabbitek/server/consumer.rb, line 20
def ack!(delivery_info, multiple = false)
  channel.ack(delivery_info.delivery_tag, multiple)
end
jid() click to toggle source
# File lib/rabbitek/server/consumer.rb, line 44
def jid
  Thread.current[:rabbit_context][:job_id]
end
logger() click to toggle source
# File lib/rabbitek/server/consumer.rb, line 28
def logger
  Rabbitek.logger
end
nack!(delivery_info, multiple = false, requeue = true) click to toggle source
# File lib/rabbitek/server/consumer.rb, line 24
def nack!(delivery_info, multiple = false, requeue = true)
  channel.nack(delivery_info.delivery_tag, multiple, requeue)
end
opts() click to toggle source
# File lib/rabbitek/server/consumer.rb, line 55
def opts
  self.class.opts
end
parse_payload(payload) click to toggle source
# File lib/rabbitek/server/consumer.rb, line 32
def parse_payload(payload)
  Utils::Oj.load(payload)
end
perform(_message) click to toggle source
# File lib/rabbitek/server/consumer.rb, line 36
def perform(_message)
  raise NotImplementedError
end
pop_message_manually() click to toggle source
# File lib/rabbitek/server/consumer.rb, line 48
def pop_message_manually
  delivery_info, properties, payload = queue.pop(manual_ack: true)
  return nil unless payload

  Message.new(delivery_info: delivery_info, properties: properties, payload: payload)
end
set_context() click to toggle source
# File lib/rabbitek/server/consumer.rb, line 40
def set_context
  Thread.current[:rabbit_context] = { consumer: self.class.name, queue: @queue.name, job_id: SecureRandom.uuid }
end