class Qwrapper::Queues::RabbitMQ

Attributes

connection[R]

Public Instance Methods

connect!() click to toggle source
# File lib/qwrapper/queues/rabbitmq.rb, line 66
def connect!
  connection.start if connection
end
disconnect!() click to toggle source
# File lib/qwrapper/queues/rabbitmq.rb, line 70
def disconnect!
  connection.close if connection
end
publish(queue_name, messages, options={}) click to toggle source
# File lib/qwrapper/queues/rabbitmq.rb, line 43
def publish(queue_name, messages, options={})
  ch = nil
  begin
    logger.info "Publishing to '#{queue_name}'"
    messages = [messages] unless messages.is_a?(Array)
    messages.flatten!
    if messages.count > 0
      ch = connection.create_channel
      ch.prefetch(options[:prefetch] || 1)
      queue = ch.queue(queue_name, options.merge(durable: true))
      messages.each do |message|
        message_options = {persistent: true}
        message_options.merge!(expiration: options[:expiration]) unless options[:expiration].nil?
        queue.publish(message.to_s, message_options)
      end
    end
  rescue Exception => ex
    logger.error ex
  ensure
    ch.close if ch
  end
end
subscribe(queue_name, options={}, &block) click to toggle source
# File lib/qwrapper/queues/rabbitmq.rb, line 12
def subscribe(queue_name, options={}, &block)
  ch = nil
  begin
    logger.info "Subscribing to '#{queue_name}'"
    ch = connection.create_channel
    ch.prefetch(options[:prefetch] || 1)
    queue = ch.queue(queue_name, options.merge(durable: true))
    queue.subscribe(manual_ack: true, block: true) do |delivery_info, metadata, payload|
      begin
        if logger.respond_to?(:wrap)
          logger_wrapped_block_execution(payload, &block)
        else
          block_execution(payload, &block)
        end
      rescue *requeue_errors => ex
        if requeue_lambda
          requeue_lambda.call(queue_name, payload, ex)
        else
          logger.error "No requeue_lambda provided"
          raise ex
        end
      end
      queue.channel.ack(delivery_info.delivery_tag)
    end
  rescue Exception => ex
    logger.error ex
  ensure
    ch.close if ch
  end
end

Private Instance Methods

block_execution(payload, &block) click to toggle source
# File lib/qwrapper/queues/rabbitmq.rb, line 97
def block_execution(payload, &block)
  block.call(payload, logger)
end
dup_logger() click to toggle source
# File lib/qwrapper/queues/rabbitmq.rb, line 101
def dup_logger
  logger.respond_to?(:duplicate) ? logger.duplicate("bunny") : logger
end
logger_wrapped_block_execution(payload, &block) click to toggle source
# File lib/qwrapper/queues/rabbitmq.rb, line 91
def logger_wrapped_block_execution(payload, &block)
  logger.wrap("SubscribedMessage") do |nested_logger|
    block.call(payload, nested_logger)
  end
end