class HonestPubsub::Server::RabbitMQSubscriber
Attributes
channel[R]
exchange[R]
listener[R]
Public Class Methods
new(routing_key, durable = true, topic="honest")
click to toggle source
# File lib/honest_pubsub/server/rabbit_mq_subscriber.rb, line 9 def initialize(routing_key, durable = true, topic="honest") @initial_key = routing_key @durable = durable @topic = topic if @initial_key.present? @routing_key = "#{@topic}.#{@initial_key}.#" else @routing_key = "#{@topic}.#" end @logger = ::HonestPubsub::Logger.new self end
Public Instance Methods
start(name, blocking=false) { |delivery_info, properties, message| ... }
click to toggle source
name - used to ensure that certain consumers are actually listening to an exchange pass in a lambda for this method to work. We might only want to expose the content instead of all 3 chunks.
# File lib/honest_pubsub/server/rabbit_mq_subscriber.rb, line 27 def start(name, blocking=false) @connection = Bunny.new(Configuration.configuration[:connection]) begin @connection.start rescue => e Airbrake.notify("RabbitMQ unreachable!", params: { message: e.message}, environment_name: ENV['RAILS_ENV'] ) raise e end @channel = @connection.create_channel @exchange = @channel.topic(@topic, :durable=>@durable, :auto_delete=>false) # FIX!!! -thl # Need to ensure that the ids for a server will be reproducible in case a server # goes down and has to get restarted. if @initial_key.present? @queue = "#{@initial_key}.#{name}" else @queue = "#{name}" end queue_arguments = {} queue_arguments["x-dead-letter-exchange"] = Configuration.configuration[:dead_letter] if Configuration.configuration[:dead_letter].present? @listener = @channel.queue(@queue, :arguments=>queue_arguments ).bind(@exchange, :routing_key => @routing_key, :exclusive=>false) # Parameters for subscribe that might be useful: # :block=>true - Used for long running consumer applications. (backend servers?) @consumer = @listener.subscribe(:consumer_tag=>name, :block=>blocking) @consumer.on_delivery do |delivery_info, properties, contents| HonestPubsub.logger.debug( "Message delivery with contents: #{contents}") if delivery_info[:redelivered] Airbrake.notify("PubSub Message redelivery", params: {info: delivery_info, props: properties, contents: contents}, environment_name: ENV['RAILS_ENV'] ) end message = ::HonestPubsub::Message.new.parse(contents) @logger.log_receive(delivery_info[:routing_key], message) yield delivery_info, properties, message true end end
teardown()
click to toggle source
# File lib/honest_pubsub/server/rabbit_mq_subscriber.rb, line 66 def teardown @consumer.cancel if @consumer.present? @connection.close if @connection.present? end