class Basquiat::Adapters::RabbitMq
The RabbitMQ adapter for Basquiat
Attributes
Public Class Methods
Initializes the superclass using a {Events} object as the procs instance variable
Basquiat::Adapters::Base::new
# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 23 def initialize(procs: Events.new) super(procs: procs) end
Public Instance Methods
Since the RabbitMQ configuration options are quite vast and it's interations with the requeue strategies a bit convoluted it uses a {Configuration} object to handle it all
# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 29 def base_options @configuration ||= Configuration.new @configuration.merge_user_options(Basquiat.configuration.adapter_options) end
Binds the queues and start the event lopp. @param block [Boolean] block the thread
# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 56 def listen(block: true, rescue_proc: Basquiat.configuration.rescue_proc) procs.keys.each { |key| session.bind_queue(key) } session.subscribe(block: block) do |message| strategy.run(message) do process_message(message, rescue_proc) end end end
# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 65 def process_message(message, rescue_proc) procs[message.routing_key].call(message) rescue StandardError => ex rescue_proc.call(ex, message) end
Publishes the event to the exchange configured. @param event [String] routing key to be used @param message [Hash] the message to be publish @param props [Hash] other properties you wish to publish with the message, such as custom headers etc.
# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 45 def publish(event, message, props: {}) if options[:publisher][:session_pool] session_pool.with { |session| session.publish(event, message, props) } else session.publish(event, message, props) end disconnect unless options[:publisher][:persistent] end
Reset the connection to RabbitMQ.
# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 72 def reset_connection connection.disconnect @connection = nil @session = nil @session_pool = nil @strategy = nil end
Lazy initializes and return the session @return [Session]
# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 90 def session @session ||= Session.new(connection.create_channel, @configuration.session_options) end
Lazy initializes and return the session pool @return [ConnectionPool<Session>]
# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 96 def session_pool @session_pool ||= ConnectionPool.new(size: options[:publisher][:session_pool].fetch(:size, 1), timeout: options[:publisher][:session_pool].fetch(:timeout, 5)) do Session.new(connection.create_channel, @configuration.session_options) end end
Lazy initializes the requeue strategy configured for the adapter @return [BaseStrategy]
# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 84 def strategy @strategy ||= @configuration.strategy.new(session) end
Adds the subscription and register the proc to the event. @param event_name [String] routing key to be matched (and bound to) when listening @param proc [#call] callable object to be run when a message with the said routing_key is received
# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 37 def subscribe_to(event_name, proc) procs[event_name] = proc end
Private Instance Methods
Lazy initializes the connection
# File lib/basquiat/adapters/rabbitmq_adapter.rb, line 106 def connection @connection ||= Connection.new(@configuration.connection_options) end