module ActivePublisher
Constants
- VERSION
Public Class Methods
# File lib/active_publisher.rb, line 23 def self.configuration @configuration ||= ::ActivePublisher::Configuration.new end
# File lib/active_publisher.rb, line 27 def self.configure yield(configuration) if block_given? end
Publish a message to RabbitMQ
@param [String] route The routing key to use for this message. @param [String] payload The message you are sending. Should already be encoded as a string. @param [String] exchange The exchange you want to publish to. @param [Hash] options hash to set message parameters (e.g. headers)
# File lib/active_publisher.rb, line 37 def self.publish(route, payload, exchange_name, options = {}) with_exchange(exchange_name) do |exchange| ::ActiveSupport::Notifications.instrument "message_published.active_publisher", :route => route do exchange.publish(payload, publishing_options(route, options)) end end end
# File lib/active_publisher.rb, line 45 def self.publish_all(exchange_name, messages) with_exchange(exchange_name) do |exchange| loop do break if messages.empty? message = messages.shift fail ActivePublisher::UnknownMessageClassError, "bulk publish messages must be ActivePublisher::Message" unless message.is_a?(ActivePublisher::Message) fail ActivePublisher::ExchangeMismatchError, "bulk publish messages must match publish_all exchange_name" if message.exchange_name != exchange_name begin ::ActiveSupport::Notifications.instrument "message_published.active_publisher", :route => message.route do exchange.publish(message.payload, publishing_options(message.route, message.options || {})) end rescue messages << message raise end end end end
Publish a message asynchronously to RabbitMQ.
Asynchronous is designed to do two things:
-
Introduce the idea of a durable retry should the RabbitMQ connection disconnect.
-
Provide a higher-level pattern for fire-and-forget publishing.
@param [String] route The routing key to use for this message. @param [String] payload The message you are sending. Should already be encoded as a string. @param [String] exchange The exchange you want to publish to. @param [Hash] options hash to set message parameters (e.g. headers).
# File lib/active_publisher/async.rb, line 12 def self.publish_async(route, payload, exchange_name, options = {}) ::ActivePublisher::Async.publisher_adapter.publish(route, payload, exchange_name, options) end
# File lib/active_publisher.rb, line 66 def self.publishing_options(route, in_options = {}) options = { :mandatory => false, :persistent => false, :routing_key => route, }.merge(in_options) if ::RUBY_PLATFORM == "java" java_options = {} java_options[:mandatory] = options.delete(:mandatory) java_options[:routing_key] = options.delete(:routing_key) java_options[:properties] = options java_options else options end end
# File lib/active_publisher.rb, line 84 def self.with_exchange(exchange_name) connection = ::ActivePublisher::Connection.connection channel = connection.create_channel begin channel.confirm_select if configuration.publisher_confirms exchange = channel.topic(exchange_name) yield(exchange) channel.wait_for_confirms if configuration.publisher_confirms ensure channel.close rescue nil end end