class Services::RabbitWrapper::Base
Public Class Methods
new(connection_string: nil, exchange_name: nil, config: nil)
click to toggle source
# File lib/services/rabbit_wrapper/base.rb, line 11 def initialize(connection_string: nil, exchange_name: nil, config: nil) @conn = Bunny.new(connection_string, log_level: :info) @conn.start # @exchange_name = exchange_name @config = setup_config(connection_string, exchange_name, config) end
Public Instance Methods
close()
click to toggle source
# File lib/services/rabbit_wrapper/base.rb, line 58 def close @conn.close end
publish(queue_name: nil, message: nil)
click to toggle source
# File lib/services/rabbit_wrapper/base.rb, line 47 def publish(queue_name: nil, message: nil) ch = @conn.create_channel q_name = pub_queue_name(queue_name) exchange = ch.direct(q_name, :durable => @config.durable) puts "Sending message: queue=#{q_name} - message=#{message}" q = ch.queue(q_name, :durable => @config.durable) q.bind(exchange, :routing_key => q_name) exchange.publish(message, :routing_key => q_name) end
subscribe(queue_name = nil) { |body| ... }
click to toggle source
# File lib/services/rabbit_wrapper/base.rb, line 18 def subscribe(queue_name = nil) ch = @conn.create_channel q_name = sub_queue_name(queue_name) exchange = ch.direct( q_name, :durable => @config.durable ) q = ch.queue(q_name, :durable => @config.durable) q.bind(exchange, :routing_key => q_name) ch.prefetch(1) puts " [*] Waiting for messages (#{queue_name}). To exit press CTRL+C" begin q.subscribe(manual_ack: true) do |delivery_info, properties, body| puts " [x] Received (redelivery: #{redelivery?(delivery_info)})" yield(body) ch.ack(delivery_info.delivery_tag) puts " - Message acknowledged" puts " [x] Done" end rescue Interrupt => _ exit(0) end end
Private Instance Methods
redelivery?(delivery_info)
click to toggle source
# File lib/services/rabbit_wrapper/base.rb, line 64 def redelivery?(delivery_info) begin delivery_info.redelivered rescue => e puts "Redelivered method does not exist" return nil end end