module MessageQueue

Constants

ADAPTERS
SERIALIZERS
VERSION

Attributes

connection[R]
settings[R]

Public Class Methods

hook_rails!() click to toggle source
# File lib/message_queue/rails.rb, line 2
def self.hook_rails!
  MessageQueue::Logging.logger = ::Rails.logger

  config_file = ::Rails.root.join("config", "message_queue.yml")
  config = if config_file.exist?
             HashWithIndifferentAccess.new YAML.load_file(config_file)[::Rails.env]
           else
             { :adapter => :memory, :serializer => :json }
           end
  MessageQueue.connect(config)
end

Public Instance Methods

connect(file_or_options = {}) click to toggle source

Public: Connect to the message queue.

It either reads options from a Hash or the path to the Yaml settings file. After connecting, it stores the connection instance locally.

file_or_options - The Hash options or the String Yaml settings file

Detail Hash options see the new_connection method.

Returns the connection for the specified message queue. Raises a RuntimeError if an adapter can't be found.

# File lib/message_queue.rb, line 26
def connect(file_or_options = {})
  if file_or_options.is_a?(String)
    require "yaml"
    file_or_options = YAML.load_file(file_or_options).inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo}
  end

  @settings = file_or_options
  @connection = new_connection(@settings)
  @connection.connect
end
connected?() click to toggle source

Public: Check if it's connected to the message queue

Returns true if it's connected

# File lib/message_queue.rb, line 63
def connected?
  connection.connected? if connection
end
consumables() click to toggle source

Internal: Get the list of consumables.

Returns the list of consumables.

# File lib/message_queue.rb, line 151
def consumables
  @consumables ||= []
end
disconnect() click to toggle source

Public: Disconnect from the message queue if it's connected

It clears out the stored connection.

Returns true if it disconnects successfully

# File lib/message_queue.rb, line 42
def disconnect
  if @connection
    @connection.disconnect
    @connection = nil
    return true
  end

  false
end
error_handlers() click to toggle source

Internal: Get the list of error handlers.

Returns the list of error handlers.

# File lib/message_queue.rb, line 144
def error_handlers
  @error_handlers ||= []
end
klass_name_for(name) click to toggle source

Internal: Return the class name for name

Returns the class name for specified string

# File lib/message_queue.rb, line 188
def klass_name_for(name)
  name.to_s.split("_").map(&:capitalize) * ""
end
load_adapter(name) click to toggle source

Internal: Load an adapter by name

Returns the adapter or nil if it can't find it

# File lib/message_queue.rb, line 158
def load_adapter(name)
  ADAPTERS.each do |a|
    if a.to_s == name.to_s
      require_relative "message_queue/adapters/#{name}"
      klass_name = klass_name_for(name)
      return MessageQueue::Adapters.const_get(klass_name)
    end
  end

  nil
end
load_serializer(name) click to toggle source

Internal: Load a serializer by name

Returns the serializer or nil if it can't find it

# File lib/message_queue.rb, line 173
def load_serializer(name)
  SERIALIZERS.each do |s|
    if s.to_s == name.to_s
      require_relative "message_queue/serializers/#{name}"
      klass_name = klass_name_for(name)
      return MessageQueue::Serializers.const_get(klass_name)
    end
  end

  nil
end
logger() click to toggle source
# File lib/message_queue.rb, line 119
def logger
  Logging.logger
end
new_connection(options = {}) click to toggle source

Public: Initialize a connection to a message queue.

options - The Hash options used to initialize a connection

:adapter - The Symbol adapter, currently only :bunny is supported.
           Detailed options see individual adapter implementation.
:serializer - The Symbol serializer for serialization.

Returns the connection for the specified message queue. Raises a RuntimeError if an adapter can't be found.

# File lib/message_queue.rb, line 76
def new_connection(options = {})
  adapter = load_adapter(options[:adapter])
  raise "Missing adapter #{options[:adapter]}" unless adapter

  serializer = load_serializer(options[:serializer])
  raise "Missing serializer #{options[:serializer]}" unless serializer

  adapter.new_connection(serializer, options)
end
new_consumer(options = {}) click to toggle source

Public: Initialize a consumer using current connection to a message queue.

Details options see a particular adapter.

Returns a new consumer

# File lib/message_queue.rb, line 115
def new_consumer(options = {})
  connection.new_consumer(options)
end
new_producer(options = {}) click to toggle source

Public: Initialize a producer using current connection to a message queue.

Details options see a particular adapter.

Returns a new producer

# File lib/message_queue.rb, line 106
def new_producer(options = {})
  connection.new_producer(options)
end
reconnect() click to toggle source

Public: Reconnect to the message queue if it's disconnected by using the previous connection settings

Returns the new connection if it reconnects successfully

# File lib/message_queue.rb, line 55
def reconnect
  disconnect if connected?
  connect(settings)
end
register_consumable(consumable) click to toggle source

Internal: Register a consumable.

Returns the registered consumables.

# File lib/message_queue.rb, line 130
def register_consumable(consumable)
  consumables << consumable
end
register_error_handler(error_handler) click to toggle source

Internal: Register a error handler.

Returns the registered error handlers.

# File lib/message_queue.rb, line 137
def register_error_handler(error_handler)
  error_handlers << error_handler
end
run_consumables(options = {}) click to toggle source
# File lib/message_queue.rb, line 123
def run_consumables(options = {})
  MessageQueue::ConsumableRunner.new(consumables).run(options)
end
with_connection(options = {}, &block) click to toggle source

Public: Initialize a connection to a message queue, connect and execute code in a block.

options - The Hash options used to initialize a connection

:adapter - The Symbol adapter, currently only :bunny is supported.
           Detailed options see individual adapter implementation.
:serializer - The Symbol serializer for serialization.

block - The code to execute. The connection object with be passed in.

Returns nothing Raises a RuntimeError if an adapter can't be found.

# File lib/message_queue.rb, line 96
def with_connection(options = {}, &block)
  connection = new_connection(options)
  connection.with_connection(&block)
end