module MessageQueue
Constants
- ADAPTERS
- SERIALIZERS
- VERSION
Attributes
Public Class Methods
# File lib/message_queue/rails.rb, line 2 def self.hook_rails! MessageQueue::Logging.logger = ::Rails.logger config_file = ::Rails.root.join("config", "message_queue.yml") config = if config_file.exist? HashWithIndifferentAccess.new YAML.load_file(config_file)[::Rails.env] else { :adapter => :memory, :serializer => :json } end MessageQueue.connect(config) end
Public Instance Methods
Public: Connect to the message queue.
It either reads options from a Hash or the path to the Yaml settings file. After connecting, it stores the connection instance locally.
file_or_options - The Hash options or the String Yaml settings file
Detail Hash options see the new_connection method.
Returns the connection for the specified message queue. Raises a RuntimeError if an adapter can't be found.
# File lib/message_queue.rb, line 26 def connect(file_or_options = {}) if file_or_options.is_a?(String) require "yaml" file_or_options = YAML.load_file(file_or_options).inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo} end @settings = file_or_options @connection = new_connection(@settings) @connection.connect end
Public: Check if it's connected to the message queue
Returns true if it's connected
# File lib/message_queue.rb, line 63 def connected? connection.connected? if connection end
Internal: Get the list of consumables.
Returns the list of consumables.
# File lib/message_queue.rb, line 151 def consumables @consumables ||= [] end
Public: Disconnect from the message queue if it's connected
It clears out the stored connection.
Returns true if it disconnects successfully
# File lib/message_queue.rb, line 42 def disconnect if @connection @connection.disconnect @connection = nil return true end false end
Internal: Get the list of error handlers.
Returns the list of error handlers.
# File lib/message_queue.rb, line 144 def error_handlers @error_handlers ||= [] end
Internal: Return the class name for name
Returns the class name for specified string
# File lib/message_queue.rb, line 188 def klass_name_for(name) name.to_s.split("_").map(&:capitalize) * "" end
Internal: Load an adapter by name
Returns the adapter or nil if it can't find it
# File lib/message_queue.rb, line 158 def load_adapter(name) ADAPTERS.each do |a| if a.to_s == name.to_s require_relative "message_queue/adapters/#{name}" klass_name = klass_name_for(name) return MessageQueue::Adapters.const_get(klass_name) end end nil end
Internal: Load a serializer by name
Returns the serializer or nil if it can't find it
# File lib/message_queue.rb, line 173 def load_serializer(name) SERIALIZERS.each do |s| if s.to_s == name.to_s require_relative "message_queue/serializers/#{name}" klass_name = klass_name_for(name) return MessageQueue::Serializers.const_get(klass_name) end end nil end
# File lib/message_queue.rb, line 119 def logger Logging.logger end
Public: Initialize a connection to a message queue.
options - The Hash options used to initialize a connection
:adapter - The Symbol adapter, currently only :bunny is supported. Detailed options see individual adapter implementation. :serializer - The Symbol serializer for serialization.
Returns the connection for the specified message queue. Raises a RuntimeError if an adapter can't be found.
# File lib/message_queue.rb, line 76 def new_connection(options = {}) adapter = load_adapter(options[:adapter]) raise "Missing adapter #{options[:adapter]}" unless adapter serializer = load_serializer(options[:serializer]) raise "Missing serializer #{options[:serializer]}" unless serializer adapter.new_connection(serializer, options) end
Public: Initialize a consumer using current connection to a message queue.
Details options see a particular adapter.
Returns a new consumer
# File lib/message_queue.rb, line 115 def new_consumer(options = {}) connection.new_consumer(options) end
Public: Initialize a producer using current connection to a message queue.
Details options see a particular adapter.
Returns a new producer
# File lib/message_queue.rb, line 106 def new_producer(options = {}) connection.new_producer(options) end
Public: Reconnect to the message queue if it's disconnected by using the previous connection settings
Returns the new connection if it reconnects successfully
# File lib/message_queue.rb, line 55 def reconnect disconnect if connected? connect(settings) end
Internal: Register a consumable.
Returns the registered consumables.
# File lib/message_queue.rb, line 130 def register_consumable(consumable) consumables << consumable end
Internal: Register a error handler.
Returns the registered error handlers.
# File lib/message_queue.rb, line 137 def register_error_handler(error_handler) error_handlers << error_handler end
# File lib/message_queue.rb, line 123 def run_consumables(options = {}) MessageQueue::ConsumableRunner.new(consumables).run(options) end
Public: Initialize a connection to a message queue, connect and execute code in a block.
options - The Hash options used to initialize a connection
:adapter - The Symbol adapter, currently only :bunny is supported. Detailed options see individual adapter implementation. :serializer - The Symbol serializer for serialization.
block - The code to execute. The connection object with be passed in.
Returns nothing Raises a RuntimeError if an adapter can't be found.
# File lib/message_queue.rb, line 96 def with_connection(options = {}, &block) connection = new_connection(options) connection.with_connection(&block) end