class MessageBrokerAdapter::RabbitMQ

Adapter implements RabbitMQ Exchange

Attributes

pub_channel_locker[R]
pub_conn_locker[R]
sub_channel_locker[R]
sub_conn_locker[R]

Public Class Methods

connection_configs() click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 50
def self.connection_configs
  {
    host: ENV['MessageBroker_Host'] || 'localhost',
    port: ENV['MessageBroker_Port'] || '5672',
    username: ENV['MessageBroker_User'] || 'guest',
    password: ENV['MessageBroker_Pass'] || 'guest',
    vhost: ENV['MessageBroker_RabbitMQ_Vhost'] || '/'
  }
end
debug_mode?() click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 46
def self.debug_mode?
  ENV['MessageBroker_Debug'] == 'true'
end
publish(topic, payload, options = {}) click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 23
def self.publish(topic, payload, options = {})
  if options[:direct_to_q] == true
    q = publisher_channel.queue(topic, passive: true)
    q.publish(payload.to_json)
  else
    exchange = publisher_channel.fanout(topic, passive: true)
    exchange.publish(payload.to_json)
  end

  puts "- RMQ: Published #{payload.to_json} on #{topic}" if debug_mode?
end
publisher_channel() click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 75
def self.publisher_channel
  if Thread.current[:rmq_pub_channel].nil?
    pub_channel_locker.synchronize do
      Thread.current[:rmq_pub_channel] ||= publisher_connection
                                           .create_channel
    end
  else
    Thread.current[:rmq_pub_channel]
  end
end
publisher_connection() click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 64
def self.publisher_connection
  if @pub_connection.nil?
    pub_conn_locker.synchronize do
      @pub_connection ||= Bunny.new(connection_configs).tap(&:start)
    end
  else
    recover_connection(@pub_connection, pub_conn_locker)
    @pub_connection
  end
end
recover_connection(conn, locker) click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 60
def self.recover_connection(conn, locker)
  locker.synchronize { conn.start } unless conn.open?
end
subscribe(queue, options = {}) { |parse| ... } click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 35
def self.subscribe(queue, options = {})
  q = subscriber_channel.queue(queue, passive: true)

  options[:block] ||= options[:block].nil? ? true : options[:block]
  q.subscribe(block: options[:block]) do |_del_info, _props, payload|
    yield(JSON.parse(payload))
  end

  puts "- RMQ: Subscribed successfully to the topic #{queue}" if debug_mode?
end
subscriber_channel() click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 97
def self.subscriber_channel
  if Thread.current[:rmq_sub_channel].nil?
    sub_channel_locker.synchronize do
      Thread.current[:rmq_sub_channel] ||= subscriber_connection
                                           .create_channel
    end
  else
    Thread.current[:rmq_sub_channel]
  end
end
subscriber_connection() click to toggle source
# File lib/messaging_adapter/adapters/rabbitmq.rb, line 86
def self.subscriber_connection
  if @sub_connection.nil?
    sub_conn_locker.synchronize do
      @sub_connection ||= Bunny.new(connection_configs).tap(&:start)
    end
  else
    recover_connection(@sub_connection, sub_conn_locker)
    @sub_connection
  end
end