class Fluent::Plugin::AMQPOutput
AMQPOutput
to be used as a Fluent
MATCHER, sending messages to a RabbitMQ messaging broker
Constants
- DEFAULT_BUFFER_TYPE
Attributes
channel[R]
connection[RW]
exch[R]
Attribute readers to support testing
Public Instance Methods
check_tls_configuration()
click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 239 def check_tls_configuration() if @tls unless @tls_key && @tls_cert raise Fluent::ConfigError, "'tls_key' and 'tls_cert' must be all specified if tls is enabled." end end end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_amqp.rb, line 90 def configure(conf) compat_parameters_convert(conf, :buffer) super @conf = conf # Extract the header configuration into a collection @headers = conf.elements.select {|e| e.name == 'header' }.map {|e| he = HeaderElement.new he.configure(e) unless he.source || he.default raise Fluent::ConfigError, "At least 'default' or 'source' must must be defined in a header configuration section." end he } unless @host || @hosts raise Fluent::ConfigError, "'host' or 'hosts' must be specified." end unless @key || @tag_key raise Fluent::ConfigError, "Either 'key' or 'tag_key' must be set." end check_tls_configuration end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 152 def format(tag, time, record) [tag, time, record].to_msgpack end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 148 def formatted_to_msgpack_binary true end
get_connection_options()
click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 247 def get_connection_options() hosts = @hosts ||= Array.new(1, @host) opts = { hosts: hosts, port: @port, vhost: @vhost, pass: @pass, user: @user, ssl: @ssl, verify_ssl: @verify_ssl, heartbeat: @heartbeat, tls: @tls || nil, tls_cert: @tls_cert, tls_key: @tls_key, verify_peer: @tls_verify_peer } opts[:tls_ca_certificates] = @tls_ca_certificates if @tls_ca_certificates return opts end
headers( tag, time, data )
click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 220 def headers( tag, time, data ) h = {} log.debug "Processing Headers: #{@headers}" # A little messy this... # Trying to allow for header overrides where a header defined # earlier will be used if a later header is returning nil (ie not found and no default) h = Hash[ @headers .collect{|v| [v.name, v.getValue(data) ]} .delete_if{|x| x.last.nil?} ] h[@tag_header] = tag if @tag_header h[@time_header] = Time.at(time).utc.to_s if @time_header h end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 144 def multi_workers_ready? true end
routing_key( tag )
click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 212 def routing_key( tag ) if @tag_key tag else @key end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_amqp.rb, line 139 def shutdown @connection.stop super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_amqp.rb, line 116 def start super begin log.info "Connecting to RabbitMQ..." @connection = Bunny.new(get_connection_options) unless @connection @connection.start rescue Bunny::TCPConnectionFailed => e log.error "Connection to #{@host} failed" rescue Bunny::PossibleAuthenticationFailureError => e log.error "Could not authenticate as #{@user}" end log.info 'Creating new channel' @channel = @connection.create_channel return if @exchange.to_s =~ CHUNK_KEY_PLACEHOLDER_PATTERN log.info 'Creating new exchange (in start)', exchange: @exchange @exch = @channel.exchange(@exchange, type: @exchange_type.intern, passive: @passive, durable: @durable, auto_delete: @auto_delete) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_amqp.rb, line 156 def write(chunk) begin log.debug 'in write', chunk_id: dump_unique_id_hex(chunk.unique_id) log.debug 'raw exchange value is', exchange: @exchange.to_s if @exchange.to_s =~ CHUNK_KEY_PLACEHOLDER_PATTERN exchange_name = extract_placeholders(@exchange, chunk) log.info 'resolved exchange value is', exchange_name: exchange_name @exch = @channel.exchange(exchange_name, type: @exchange_type.intern, passive: @passive, durable: @durable, auto_delete: @auto_delete) end log.debug 'writing data to exchange', chunk_id: dump_unique_id_hex(chunk.unique_id) chunk.msgpack_each do |(tag, time, data)| begin msg_headers = headers(tag,time,data) begin data = JSON.dump( data ) unless data.is_a?( String ) rescue JSON::GeneratorError => e log.warn "Failure converting data object to json string: #{e.message} - sending as raw object" # Debug only - otherwise we may pollute the fluent logs with unparseable events and loop log.debug "JSON.dump failure converting [#{data}]" end log.info "Sending message #{data}, :key => #{routing_key( tag)} :headers => #{headers(tag,time,data)}" @exch.publish( data, key: routing_key( tag ), persistent: @persistent, headers: msg_headers, content_type: @content_type, content_encoding: @content_encoding) # :nocov: # Hard to throw StandardError through test code rescue StandardError => e # This protects against invalid byteranges and other errors at a per-message level log.error "Unexpected error during message publishing: #{e.message}" log.debug "Failure in publishing message [#{data}]" end end rescue MessagePack::MalformedFormatError => e # This has been observed when a server has filled the partition containing # the buffer files, and during replay the chunks were malformed log.error "Malformed msgpack in chunk - Did your server run out of space during buffering? #{e.message}" rescue StandardError => e # Just in case theres any other errors during chunk loading. log.error "Unexpected error during message publishing: #{e.message}" end # :nocov: end