class LogStash::Outputs::Fluentd
Constants
- VERSION
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 87 def close buffer_flush(final: true) end
convert_from_event_to_msgpack(event)
click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 36 def convert_from_event_to_msgpack(event) entry = [(event.timestamp.to_i || Time.now.to_i), event.to_hash] begin entry.to_msgpack rescue ArgumentError, NoMethodError LogStash::Json.load(LogStash::Json.dump(entry)).to_msgpack end end
flush(events, teardown = false)
click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 64 def flush(events, teardown = false) @logger.debug "flushing #{events} events" return if events.size < 1 data = [@tag, events.join].to_msgpack @logger.debug "sending chunk #{data.bytesize} bytes" connect.write data @logger.debug "done" end
on_flush_error(e)
click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 77 def on_flush_error(e) @logger.warn "flush error #{e.class}: #{e.message}" end
on_full_buffer_error(opts={})
click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 82 def on_full_buffer_error(opts={}) @logger.warn "buffer exceeds limits: pending:#{opts[:pending_count]}, outgoing:#{opts[:outgoing_count]}" end
receive(event)
click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 55 def receive(event) @logger.debug "receive a event" buffer_receive(convert_from_event_to_msgpack(event)) @logger.debug "buffered a event" end
register()
click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 46 def register buffer_initialize( max_items: @flush_size, max_interval: @flush_interval, logger: @logger ) end
Private Instance Methods
connect()
click to toggle source
# File lib/logstash/outputs/fluentd.rb, line 92 def connect Stud::try do return TCPSocket.new(@host, @port) end end