class Fluent::Plugin::AMQPOutput

AMQPOutput to be used as a Fluent MATCHER, sending messages to a RabbitMQ messaging broker

Constants

DEFAULT_BUFFER_TYPE

Attributes

channel[R]
connection[RW]
exch[R]

Attribute readers to support testing

Public Instance Methods

check_tls_configuration() click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 239
def check_tls_configuration()
  if @tls
    unless @tls_key && @tls_cert
      raise Fluent::ConfigError, "'tls_key' and 'tls_cert' must be all specified if tls is enabled."
    end
  end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_amqp.rb, line 90
def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super
  @conf = conf

  # Extract the header configuration into a collection
  @headers = conf.elements.select {|e|
    e.name == 'header'
  }.map {|e|
    he = HeaderElement.new
    he.configure(e)
    unless he.source || he.default
      raise Fluent::ConfigError, "At least 'default' or 'source' must must be defined in a header configuration section."
    end
    he
  }

  unless @host || @hosts
    raise Fluent::ConfigError, "'host' or 'hosts' must be specified."
  end
  unless @key || @tag_key
    raise Fluent::ConfigError, "Either 'key' or 'tag_key' must be set."
  end
  check_tls_configuration
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 152
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 148
def formatted_to_msgpack_binary
  true
end
get_connection_options() click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 247
def get_connection_options()
  hosts = @hosts ||= Array.new(1, @host)
  opts = {
    hosts: hosts, port: @port, vhost: @vhost,
    pass: @pass, user: @user, ssl: @ssl,
    verify_ssl: @verify_ssl, heartbeat: @heartbeat,
    tls: @tls || nil,
    tls_cert: @tls_cert,
    tls_key: @tls_key,
    verify_peer: @tls_verify_peer
  }
  opts[:tls_ca_certificates] = @tls_ca_certificates if @tls_ca_certificates
  return opts
end
headers( tag, time, data ) click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 220
def headers( tag, time, data )
  h = {}

  log.debug "Processing Headers: #{@headers}"
  # A little messy this...
  # Trying to allow for header overrides where a header defined
  # earlier will be used if a later header is returning nil (ie not found and no default)
  h = Hash[ @headers
              .collect{|v| [v.name, v.getValue(data) ]}
              .delete_if{|x| x.last.nil?}
      ]

  h[@tag_header] = tag if @tag_header
  h[@time_header] = Time.at(time).utc.to_s if @time_header

  h
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 144
def multi_workers_ready?
  true
end
routing_key( tag ) click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 212
def routing_key( tag )
  if @tag_key
    tag
  else
    @key
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_amqp.rb, line 139
def shutdown
  @connection.stop
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_amqp.rb, line 116
def start
  super
  begin
    log.info "Connecting to RabbitMQ..."
    @connection = Bunny.new(get_connection_options) unless @connection
    @connection.start
  rescue Bunny::TCPConnectionFailed => e
    log.error "Connection to #{@host} failed"
  rescue Bunny::PossibleAuthenticationFailureError => e
    log.error "Could not authenticate as #{@user}"
  end

  log.info 'Creating new channel'
  @channel = @connection.create_channel

  return if @exchange.to_s =~ CHUNK_KEY_PLACEHOLDER_PATTERN

  log.info 'Creating new exchange (in start)', exchange: @exchange
  @exch = @channel.exchange(@exchange, type: @exchange_type.intern,
                          passive: @passive, durable: @durable,
                          auto_delete: @auto_delete)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 156
  def write(chunk)
    begin
      log.debug 'in write', chunk_id: dump_unique_id_hex(chunk.unique_id)
      log.debug 'raw exchange value is', exchange: @exchange.to_s

      if @exchange.to_s =~ CHUNK_KEY_PLACEHOLDER_PATTERN
        exchange_name = extract_placeholders(@exchange, chunk)
        log.info 'resolved exchange value is', exchange_name: exchange_name
        @exch = @channel.exchange(exchange_name, type: @exchange_type.intern,
                                  passive: @passive, durable: @durable,
                                  auto_delete: @auto_delete)
      end

      log.debug 'writing data to exchange', chunk_id: dump_unique_id_hex(chunk.unique_id)

      chunk.msgpack_each do |(tag, time, data)|
        begin
          msg_headers = headers(tag,time,data)

          begin
            data = JSON.dump( data ) unless data.is_a?( String )
          rescue JSON::GeneratorError => e
            log.warn "Failure converting data object to json string: #{e.message} - sending as raw object"
            # Debug only - otherwise we may pollute the fluent logs with unparseable events and loop
            log.debug "JSON.dump failure converting [#{data}]"
          end

          log.info "Sending message #{data}, :key => #{routing_key( tag)} :headers => #{headers(tag,time,data)}"
          @exch.publish(
            data,
            key: routing_key( tag ),
            persistent: @persistent,
            headers: msg_headers,
            content_type: @content_type,
            content_encoding: @content_encoding)

# :nocov:
#  Hard to throw StandardError through test code
        rescue StandardError => e
          # This protects against invalid byteranges and other errors at a per-message level
          log.error "Unexpected error during message publishing: #{e.message}"
          log.debug "Failure in publishing message [#{data}]"
        end
      end
    rescue MessagePack::MalformedFormatError => e
      # This has been observed when a server has filled the partition containing
      # the buffer files, and during replay the chunks were malformed
      log.error "Malformed msgpack in chunk - Did your server run out of space during buffering? #{e.message}"
    rescue StandardError => e
      # Just in case theres any other errors during chunk loading.
      log.error "Unexpected error during message publishing: #{e.message}"
    end
    # :nocov:
  end