class Basquiat::Adapters::RabbitMq

The RabbitMQ adapter for Basquiat

Attributes

procs[R]

Public Class Methods

new(procs: Events.new) click to toggle source

Initializes the superclass using a {Events} object as the procs instance variable

Calls superclass method Basquiat::Adapters::Base::new
# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 23
def initialize(procs: Events.new)
  super(procs: procs)
end

Public Instance Methods

base_options() click to toggle source

Since the RabbitMQ configuration options are quite vast and it's interations with the requeue strategies a bit convoluted it uses a {Configuration} object to handle it all

# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 29
def base_options
  @configuration ||= Configuration.new
  @configuration.merge_user_options(Basquiat.configuration.adapter_options)
end
disconnect()
Alias for: reset_connection
listen(block: true, rescue_proc: Basquiat.configuration.rescue_proc) click to toggle source

Binds the queues and start the event lopp. @param block [Boolean] block the thread

# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 56
def listen(block: true, rescue_proc: Basquiat.configuration.rescue_proc)
  procs.keys.each { |key| session.bind_queue(key) }
  session.subscribe(block: block) do |message|
    strategy.run(message) do
      process_message(message, rescue_proc)
    end
  end
end
process_message(message, rescue_proc) click to toggle source
# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 65
def process_message(message, rescue_proc)
  procs[message.routing_key].call(message)
rescue StandardError => ex
  rescue_proc.call(ex, message)
end
publish(event, message, props: {}) click to toggle source

Publishes the event to the exchange configured. @param event [String] routing key to be used @param message [Hash] the message to be publish @param props [Hash] other properties you wish to publish with the message, such as custom headers etc.

# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 45
def publish(event, message, props: {})
  if options[:publisher][:session_pool]
    session_pool.with { |session| session.publish(event, message, props) }
  else
    session.publish(event, message, props)
  end
  disconnect unless options[:publisher][:persistent]
end
reset_connection() click to toggle source

Reset the connection to RabbitMQ.

# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 72
def reset_connection
  connection.disconnect
  @connection   = nil
  @session      = nil
  @session_pool = nil
  @strategy     = nil
end
Also aliased as: disconnect
session() click to toggle source

Lazy initializes and return the session @return [Session]

# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 90
def session
  @session ||= Session.new(connection.create_channel, @configuration.session_options)
end
session_pool() click to toggle source

Lazy initializes and return the session pool @return [ConnectionPool<Session>]

# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 96
def session_pool
  @session_pool ||= ConnectionPool.new(size: options[:publisher][:session_pool].fetch(:size, 1),
                                       timeout: options[:publisher][:session_pool].fetch(:timeout, 5)) do
    Session.new(connection.create_channel, @configuration.session_options)
  end
end
strategy() click to toggle source

Lazy initializes the requeue strategy configured for the adapter @return [BaseStrategy]

# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 84
def strategy
  @strategy ||= @configuration.strategy.new(session)
end
subscribe_to(event_name, proc) click to toggle source

Adds the subscription and register the proc to the event. @param event_name [String] routing key to be matched (and bound to) when listening @param proc [#call] callable object to be run when a message with the said routing_key is received

# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 37
def subscribe_to(event_name, proc)
  procs[event_name] = proc
end

Private Instance Methods

connection() click to toggle source

Lazy initializes the connection

# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 106
def connection
  @connection ||= Connection.new(@configuration.connection_options)
end