class MessageBrokerAdapter::RabbitMQ
Adapter implements RabbitMQ
Exchange
Attributes
pub_channel_locker[R]
pub_conn_locker[R]
sub_channel_locker[R]
sub_conn_locker[R]
Public Class Methods
connection_configs()
click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 50 def self.connection_configs { host: ENV['MessageBroker_Host'] || 'localhost', port: ENV['MessageBroker_Port'] || '5672', username: ENV['MessageBroker_User'] || 'guest', password: ENV['MessageBroker_Pass'] || 'guest', vhost: ENV['MessageBroker_RabbitMQ_Vhost'] || '/' } end
debug_mode?()
click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 46 def self.debug_mode? ENV['MessageBroker_Debug'] == 'true' end
publish(topic, payload, options = {})
click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 23 def self.publish(topic, payload, options = {}) if options[:direct_to_q] == true q = publisher_channel.queue(topic, passive: true) q.publish(payload.to_json) else exchange = publisher_channel.fanout(topic, passive: true) exchange.publish(payload.to_json) end puts "- RMQ: Published #{payload.to_json} on #{topic}" if debug_mode? end
publisher_channel()
click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 75 def self.publisher_channel if Thread.current[:rmq_pub_channel].nil? pub_channel_locker.synchronize do Thread.current[:rmq_pub_channel] ||= publisher_connection .create_channel end else Thread.current[:rmq_pub_channel] end end
publisher_connection()
click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 64 def self.publisher_connection if @pub_connection.nil? pub_conn_locker.synchronize do @pub_connection ||= Bunny.new(connection_configs).tap(&:start) end else recover_connection(@pub_connection, pub_conn_locker) @pub_connection end end
recover_connection(conn, locker)
click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 60 def self.recover_connection(conn, locker) locker.synchronize { conn.start } unless conn.open? end
subscribe(queue, options = {}) { |parse| ... }
click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 35 def self.subscribe(queue, options = {}) q = subscriber_channel.queue(queue, passive: true) options[:block] ||= options[:block].nil? ? true : options[:block] q.subscribe(block: options[:block]) do |_del_info, _props, payload| yield(JSON.parse(payload)) end puts "- RMQ: Subscribed successfully to the topic #{queue}" if debug_mode? end
subscriber_channel()
click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 97 def self.subscriber_channel if Thread.current[:rmq_sub_channel].nil? sub_channel_locker.synchronize do Thread.current[:rmq_sub_channel] ||= subscriber_connection .create_channel end else Thread.current[:rmq_sub_channel] end end
subscriber_connection()
click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 86 def self.subscriber_connection if @sub_connection.nil? sub_conn_locker.synchronize do @sub_connection ||= Bunny.new(connection_configs).tap(&:start) end else recover_connection(@sub_connection, sub_conn_locker) @sub_connection end end