class Qwrapper::Queues::RabbitMQ
Attributes
connection[R]
Public Instance Methods
connect!()
click to toggle source
# File lib/qwrapper/queues/rabbitmq.rb, line 66 def connect! connection.start if connection end
disconnect!()
click to toggle source
# File lib/qwrapper/queues/rabbitmq.rb, line 70 def disconnect! connection.close if connection end
publish(queue_name, messages, options={})
click to toggle source
# File lib/qwrapper/queues/rabbitmq.rb, line 43 def publish(queue_name, messages, options={}) ch = nil begin logger.info "Publishing to '#{queue_name}'" messages = [messages] unless messages.is_a?(Array) messages.flatten! if messages.count > 0 ch = connection.create_channel ch.prefetch(options[:prefetch] || 1) queue = ch.queue(queue_name, options.merge(durable: true)) messages.each do |message| message_options = {persistent: true} message_options.merge!(expiration: options[:expiration]) unless options[:expiration].nil? queue.publish(message.to_s, message_options) end end rescue Exception => ex logger.error ex ensure ch.close if ch end end
subscribe(queue_name, options={}, &block)
click to toggle source
# File lib/qwrapper/queues/rabbitmq.rb, line 12 def subscribe(queue_name, options={}, &block) ch = nil begin logger.info "Subscribing to '#{queue_name}'" ch = connection.create_channel ch.prefetch(options[:prefetch] || 1) queue = ch.queue(queue_name, options.merge(durable: true)) queue.subscribe(manual_ack: true, block: true) do |delivery_info, metadata, payload| begin if logger.respond_to?(:wrap) logger_wrapped_block_execution(payload, &block) else block_execution(payload, &block) end rescue *requeue_errors => ex if requeue_lambda requeue_lambda.call(queue_name, payload, ex) else logger.error "No requeue_lambda provided" raise ex end end queue.channel.ack(delivery_info.delivery_tag) end rescue Exception => ex logger.error ex ensure ch.close if ch end end
Private Instance Methods
block_execution(payload, &block)
click to toggle source
# File lib/qwrapper/queues/rabbitmq.rb, line 97 def block_execution(payload, &block) block.call(payload, logger) end
dup_logger()
click to toggle source
# File lib/qwrapper/queues/rabbitmq.rb, line 101 def dup_logger logger.respond_to?(:duplicate) ? logger.duplicate("bunny") : logger end
logger_wrapped_block_execution(payload, &block)
click to toggle source
# File lib/qwrapper/queues/rabbitmq.rb, line 91 def logger_wrapped_block_execution(payload, &block) logger.wrap("SubscribedMessage") do |nested_logger| block.call(payload, nested_logger) end end