class Hutch::Publisher

Attributes

channel[R]
config[R]
connection[R]
exchange[R]

Public Class Methods

new(connection, channel, exchange, config = Hutch::Config) click to toggle source
# File lib/hutch/publisher.rb, line 10
def initialize(connection, channel, exchange, config = Hutch::Config)
  @connection = connection
  @channel    = channel
  @exchange   = exchange
  @config     = config
end

Public Instance Methods

publish(routing_key, message, properties = {}, options = {}) click to toggle source
# File lib/hutch/publisher.rb, line 17
def publish(routing_key, message, properties = {}, options = {})
  ensure_connection!(routing_key, message)

  serializer = options[:serializer] || config[:serializer]

  non_overridable_properties = {
    routing_key:  routing_key,
    timestamp:    connection.current_timestamp,
    content_type: serializer.content_type,
  }
  properties[:message_id]   ||= generate_id

  payload = serializer.encode(message)

  log_publication(serializer, payload, routing_key)

  response = exchange.publish(payload, {persistent: true}.
    merge(properties).
    merge(global_properties).
    merge(non_overridable_properties))

  channel.wait_for_confirms if config[:force_publisher_confirms]
  response
end

Private Instance Methods

ensure_connection!(routing_key, message) click to toggle source
# File lib/hutch/publisher.rb, line 62
def ensure_connection!(routing_key, message)
  raise_publish_error('no connection to broker', routing_key, message) unless connection
  raise_publish_error('connection is closed', routing_key, message) unless connection.open?
end
generate_id() click to toggle source
# File lib/hutch/publisher.rb, line 67
def generate_id
  SecureRandom.uuid
end
global_properties() click to toggle source
# File lib/hutch/publisher.rb, line 71
def global_properties
  Hutch.global_properties.respond_to?(:call) ? Hutch.global_properties.call : Hutch.global_properties
end
log_publication(serializer, payload, routing_key) click to toggle source
# File lib/hutch/publisher.rb, line 44
def log_publication(serializer, payload, routing_key)
  logger.debug {
    spec =
      if serializer.binary?
        "#{payload.bytesize} bytes message"
      else
        "message '#{payload}'"
      end
    "publishing #{spec} to #{routing_key}"
  }
end
raise_publish_error(reason, routing_key, message) click to toggle source
# File lib/hutch/publisher.rb, line 56
def raise_publish_error(reason, routing_key, message)
  msg = "unable to publish - #{reason}. Message: #{JSON.dump(message)}, Routing key: #{routing_key}."
  logger.error(msg)
  raise PublishError, msg
end