class Fluent::Plugin::AMQPInput

AMQPInput to be used as a Fluent SOURCE, reading messages from a RabbitMQ message broker

Attributes

connection[RW]

Bunny connection handle

- Allows mocking for test purposes

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_amqp.rb, line 48
def configure(conf)
  conf['format'] ||= conf['payload_format'] # legacy
  compat_parameters_convert(conf, :parser)

  super

  parser_config = conf.elements('parse').first
  if parser_config
    @parser = parser_create(conf: parser_config)
  end

  @conf = conf
  unless (@host || @hosts) && @queue
    raise Fluent::ConfigError, "'host(s)' and 'queue' must be all specified."
  end
  check_tls_configuration
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_amqp.rb, line 99
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_amqp.rb, line 93
def shutdown
  log.info "Closing connection"
  @connection.stop
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_amqp.rb, line 66
def start
  super
  # Create a new connection, unless its already been provided to us
  @connection = Bunny.new get_connection_options unless @connection
  @connection.start
  @channel = @connection.create_channel

  if @exclusive && fluentd_worker_id > 0
    log.info 'Config requested exclusive queue with multiple workers'
    @queue += ".#{fluentd_worker_id}"
    log.info "Renamed queue name to include worker id: #{@queue}"
  end

  q = @channel.queue(@queue, passive: @passive, durable: @durable,
                   exclusive: @exclusive, auto_delete: @auto_delete)
  if @bind_exchange
    log.info "Binding #{@queue} to #{@exchange}, :routing_key => #{@routing_key}"
    q.bind(exchange=@exchange, routing_key: @routing_key)
  end

  q.subscribe do |delivery, meta, msg|
    log.debug "Recieved message #{@msg}"
    payload = parse_payload(msg)
    router.emit(parse_tag(delivery, meta), parse_time(meta), payload)
  end
end

Private Instance Methods

check_tls_configuration() click to toggle source
# File lib/fluent/plugin/in_amqp.rb, line 139
def check_tls_configuration()
  if @tls
    unless @tls_key && @tls_cert
        raise Fluent::ConfigError, "'tls_key' and 'tls_cert' must be all specified if tls is enabled."
    end
  end
end
get_connection_options() click to toggle source
# File lib/fluent/plugin/in_amqp.rb, line 147
def get_connection_options()
  hosts = @hosts ||= Array.new(1, @host)
  opts = {
    hosts: hosts, port: @port, vhost: @vhost,
    pass: @pass, user: @user, ssl: @ssl,
    verify_ssl: @verify_ssl, heartbeat: @heartbeat,
    tls: @tls,
    tls_cert: @tls_cert,
    tls_key: @tls_key,
    verify_peer: @tls_verify_peer
  }
  opts[:tls_ca_certificates] = @tls_ca_certificates if @tls_ca_certificates
  return opts
end
parse_payload(msg) click to toggle source
# File lib/fluent/plugin/in_amqp.rb, line 104
def parse_payload(msg)
  if @parser
    parsed = nil
    @parser.parse msg do |_, payload|
      if payload.nil?
        log.warn "failed to parse #{msg}"
        parsed = { "message" => msg }
      else
        parsed = payload
      end
    end
    parsed
  else
    { "message" => msg }
  end
end
parse_tag( delivery, meta ) click to toggle source
# File lib/fluent/plugin/in_amqp.rb, line 121
def parse_tag( delivery, meta )
  if @tag_key && delivery.routing_key != ''
    delivery.routing_key
  elsif @tag_header && meta[:headers][@tag_header]
    meta[:headers][@tag_header]
  else
    @tag
  end
end
parse_time( meta ) click to toggle source
# File lib/fluent/plugin/in_amqp.rb, line 131
def parse_time( meta )
  if @time_header && meta[:headers][@time_header]
    Fluent::EventTime.from_time(Time.parse( meta[:headers][@time_header] ))
  else
    Fluent::Engine.now
  end
end