module ActivePublisher

Constants

VERSION

Public Class Methods

configuration() click to toggle source
# File lib/active_publisher.rb, line 23
def self.configuration
  @configuration ||= ::ActivePublisher::Configuration.new
end
configure() { |configuration| ... } click to toggle source
# File lib/active_publisher.rb, line 27
def self.configure
  yield(configuration) if block_given?
end
publish(route, payload, exchange_name, options = {}) click to toggle source

Publish a message to RabbitMQ

@param [String] route The routing key to use for this message. @param [String] payload The message you are sending. Should already be encoded as a string. @param [String] exchange The exchange you want to publish to. @param [Hash] options hash to set message parameters (e.g. headers)

# File lib/active_publisher.rb, line 37
def self.publish(route, payload, exchange_name, options = {})
  with_exchange(exchange_name) do |exchange|
    ::ActiveSupport::Notifications.instrument "message_published.active_publisher", :route => route do
      exchange.publish(payload, publishing_options(route, options))
    end
  end
end
publish_all(exchange_name, messages) click to toggle source
# File lib/active_publisher.rb, line 45
def self.publish_all(exchange_name, messages)
  with_exchange(exchange_name) do |exchange|
    loop do
      break if messages.empty?
      message = messages.shift

      fail ActivePublisher::UnknownMessageClassError, "bulk publish messages must be ActivePublisher::Message" unless message.is_a?(ActivePublisher::Message)
      fail ActivePublisher::ExchangeMismatchError, "bulk publish messages must match publish_all exchange_name" if message.exchange_name != exchange_name

      begin
        ::ActiveSupport::Notifications.instrument "message_published.active_publisher", :route => message.route do
          exchange.publish(message.payload, publishing_options(message.route, message.options || {}))
        end
      rescue
        messages << message
        raise
      end
    end
  end
end
publish_async(route, payload, exchange_name, options = {}) click to toggle source

Publish a message asynchronously to RabbitMQ.

Asynchronous is designed to do two things:

  1. Introduce the idea of a durable retry should the RabbitMQ connection disconnect.

  2. Provide a higher-level pattern for fire-and-forget publishing.

@param [String] route The routing key to use for this message. @param [String] payload The message you are sending. Should already be encoded as a string. @param [String] exchange The exchange you want to publish to. @param [Hash] options hash to set message parameters (e.g. headers).

# File lib/active_publisher/async.rb, line 12
def self.publish_async(route, payload, exchange_name, options = {})
  ::ActivePublisher::Async.publisher_adapter.publish(route, payload, exchange_name, options)
end
publishing_options(route, in_options = {}) click to toggle source
# File lib/active_publisher.rb, line 66
def self.publishing_options(route, in_options = {})
  options = {
    :mandatory => false,
    :persistent => false,
    :routing_key => route,
  }.merge(in_options)

  if ::RUBY_PLATFORM == "java"
    java_options = {}
    java_options[:mandatory]   = options.delete(:mandatory)
    java_options[:routing_key] = options.delete(:routing_key)
    java_options[:properties]  = options
    java_options
  else
    options
  end
end
with_exchange(exchange_name) { |exchange| ... } click to toggle source
# File lib/active_publisher.rb, line 84
def self.with_exchange(exchange_name)
  connection = ::ActivePublisher::Connection.connection
  channel = connection.create_channel
  begin
    channel.confirm_select if configuration.publisher_confirms
    exchange = channel.topic(exchange_name)
    yield(exchange)
    channel.wait_for_confirms if configuration.publisher_confirms
  ensure
    channel.close rescue nil
  end
end