class RabbitJobs::Publisher::Amqp
AMQP publisher implementation.
Public Class Methods
cleanup()
click to toggle source
# File lib/rabbit_jobs/publisher/amqp.rb, line 11 def cleanup amqp_cleanup end
direct_publish_to(routing_key, payload, ex = {})
click to toggle source
# File lib/rabbit_jobs/publisher/amqp.rb, line 22 def direct_publish_to(routing_key, payload, ex = {}) exchange_name, exchange_opts = build_exchange(ex) publisher_channel.basic_publish(payload, exchange_name, routing_key, exchange_opts) fail "Disconnected from #{RJ.config.server}." unless amqp_connection.connected? true rescue RabbitJobs.logger.error $!.message raise $! end
publish_to(routing_key, klass, *params)
click to toggle source
# File lib/rabbit_jobs/publisher/amqp.rb, line 15 def publish_to(routing_key, klass, *params) check_amqp_publishing_params(routing_key, klass) payload = Job.serialize(klass, *params) direct_publish_to(routing_key.to_sym, payload) end
purge_queue(*routing_keys)
click to toggle source
# File lib/rabbit_jobs/publisher/amqp.rb, line 33 def purge_queue(*routing_keys) fail ArgumentError unless routing_keys.present? routing_keys.map(&:to_sym).each do |routing_key| publisher_channel.queue_purge(routing_key) end end
queue_status(routing_key)
click to toggle source
# File lib/rabbit_jobs/publisher/amqp.rb, line 41 def queue_status(routing_key) check_queue_status_params(routing_key) publisher_channel.queue(routing_key, RabbitJobs.config[:queues][routing_key.to_sym]).status end
Private Class Methods
build_exchange(ex)
click to toggle source
# File lib/rabbit_jobs/publisher/amqp.rb, line 48 def build_exchange(ex) ex = { name: ex.to_s } unless ex.is_a?(Hash) exchange_opts = Configuration::DEFAULT_MESSAGE_PARAMS.merge(ex || {}) exchange_name = exchange_opts.delete(:name).to_s exchange = publisher_channel.exchange(exchange_name, passive: true) exchange_on_return_policy(exchange) [exchange_name, exchange_opts] end
exchange_on_return_policy(exchange)
click to toggle source
# File lib/rabbit_jobs/publisher/amqp.rb, line 58 def exchange_on_return_policy(exchange) exchange.on_return do |basic_deliver, properties, returned_payload| RJ.logger.error full_message: caller.join("\r\n"), short_message: "AMQP ERROR: (#{basic_deliver[:reply_code]}) " \ "#{basic_deliver[:reply_text]}. " \ "exchange: #{basic_deliver[:exchange]}, " \ "key: #{basic_deliver[:routing_key]}.", _basic_deliver: basic_deliver.inspect, _properties: properties.inspect, _payload: returned_payload.inspect true end end