class Fluent::Plugin::AMQPInput
AMQPInput
to be used as a Fluent
SOURCE, reading messages from a RabbitMQ message broker
Attributes
connection[RW]
Bunny connection handle
- Allows mocking for test purposes
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_amqp.rb, line 48 def configure(conf) conf['format'] ||= conf['payload_format'] # legacy compat_parameters_convert(conf, :parser) super parser_config = conf.elements('parse').first if parser_config @parser = parser_create(conf: parser_config) end @conf = conf unless (@host || @hosts) && @queue raise Fluent::ConfigError, "'host(s)' and 'queue' must be all specified." end check_tls_configuration end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/in_amqp.rb, line 99 def multi_workers_ready? true end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_amqp.rb, line 93 def shutdown log.info "Closing connection" @connection.stop super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_amqp.rb, line 66 def start super # Create a new connection, unless its already been provided to us @connection = Bunny.new get_connection_options unless @connection @connection.start @channel = @connection.create_channel if @exclusive && fluentd_worker_id > 0 log.info 'Config requested exclusive queue with multiple workers' @queue += ".#{fluentd_worker_id}" log.info "Renamed queue name to include worker id: #{@queue}" end q = @channel.queue(@queue, passive: @passive, durable: @durable, exclusive: @exclusive, auto_delete: @auto_delete) if @bind_exchange log.info "Binding #{@queue} to #{@exchange}, :routing_key => #{@routing_key}" q.bind(exchange=@exchange, routing_key: @routing_key) end q.subscribe do |delivery, meta, msg| log.debug "Recieved message #{@msg}" payload = parse_payload(msg) router.emit(parse_tag(delivery, meta), parse_time(meta), payload) end end
Private Instance Methods
check_tls_configuration()
click to toggle source
# File lib/fluent/plugin/in_amqp.rb, line 139 def check_tls_configuration() if @tls unless @tls_key && @tls_cert raise Fluent::ConfigError, "'tls_key' and 'tls_cert' must be all specified if tls is enabled." end end end
get_connection_options()
click to toggle source
# File lib/fluent/plugin/in_amqp.rb, line 147 def get_connection_options() hosts = @hosts ||= Array.new(1, @host) opts = { hosts: hosts, port: @port, vhost: @vhost, pass: @pass, user: @user, ssl: @ssl, verify_ssl: @verify_ssl, heartbeat: @heartbeat, tls: @tls, tls_cert: @tls_cert, tls_key: @tls_key, verify_peer: @tls_verify_peer } opts[:tls_ca_certificates] = @tls_ca_certificates if @tls_ca_certificates return opts end
parse_payload(msg)
click to toggle source
# File lib/fluent/plugin/in_amqp.rb, line 104 def parse_payload(msg) if @parser parsed = nil @parser.parse msg do |_, payload| if payload.nil? log.warn "failed to parse #{msg}" parsed = { "message" => msg } else parsed = payload end end parsed else { "message" => msg } end end
parse_tag( delivery, meta )
click to toggle source
# File lib/fluent/plugin/in_amqp.rb, line 121 def parse_tag( delivery, meta ) if @tag_key && delivery.routing_key != '' delivery.routing_key elsif @tag_header && meta[:headers][@tag_header] meta[:headers][@tag_header] else @tag end end
parse_time( meta )
click to toggle source
# File lib/fluent/plugin/in_amqp.rb, line 131 def parse_time( meta ) if @time_header && meta[:headers][@time_header] Fluent::EventTime.from_time(Time.parse( meta[:headers][@time_header] )) else Fluent::Engine.now end end