class LogStash::Inputs::Delf
This input will read GELF messages as events over the network, making it a good choice if you already use Graylog2 today.
The main use case for this input is to leverage existing GELF logging libraries such as the GELF log4j appender. A library used by this plugin has a bug which prevents it parsing uncompressed data. If you use the log4j appender you need to configure it like this to force gzip even for small messages:
<Socket name="logstash" protocol="udp" host="logstash.example.com" port="5001"> <GelfLayout compressionType="GZIP" compressionThreshold="1" /> </Socket>
Constants
- MESSAGE_FIELD
- PARSE_FAILURE_LOG_MESSAGE
- PARSE_FAILURE_TAG
- RECONNECT_BACKOFF_SLEEP
- SOURCE_HOST_FIELD
- TAGS_FIELD
- TIMESTAMP_GELF_FIELD
Public Class Methods
# File lib/logstash/inputs/delf.rb, line 55 def initialize(params) super BasicSocket.do_not_reverse_lookup = true @incomplete_events = {} end
Private Class Methods
transform a given timestamp value into a proper LogStash::Timestamp, preserving microsecond precision and work around a JRuby issue with Time.at loosing fractional part with BigDecimal. @param timestamp [Numeric] a Numeric (integer, float or bigdecimal) timestampo representation @return [LogStash::Timestamp] the proper LogStash::Timestamp representation
# File lib/logstash/inputs/delf.rb, line 143 def self.coerce_timestamp(timestamp) # bug in JRuby prevents correcly parsing a BigDecimal fractional part, see https://github.com/elastic/logstash/issues/4565 timestamp.is_a?(BigDecimal) ? LogStash::Timestamp.at(timestamp.to_i, timestamp.frac * 1000000) : LogStash::Timestamp.at(timestamp) end
from_json_parse
uses the Event#from_json method to deserialize and directly produce events
# File lib/logstash/inputs/delf.rb, line 149 def self.from_json_parse(json) # from_json will always return an array of item. # in the context of gelf, the payload should be an array of 1 LogStash::Event.from_json(json).first rescue LogStash::Json::ParserError => e logger.error(PARSE_FAILURE_LOG_MESSAGE, :error => e, :data => json) LogStash::Event.new(MESSAGE_FIELD => json, TAGS_FIELD => [PARSE_FAILURE_TAG, '_fromjsonparser']) end
legacy_parse
uses the LogStash::Json class to deserialize json
# File lib/logstash/inputs/delf.rb, line 159 def self.legacy_parse(json) o = LogStash::Json.load(json) LogStash::Event.new(o) rescue LogStash::Json::ParserError => e logger.error(PARSE_FAILURE_LOG_MESSAGE, :error => e, :data => json) LogStash::Event.new(MESSAGE_FIELD => json, TAGS_FIELD => [PARSE_FAILURE_TAG, '_legacyjsonparser']) end
generate a new LogStash::Event from json input and assign host to source_host event field. @param json_gelf [String] GELF json data @param host [String] source host of GELF data @return [LogStash::Event] new event with parsed json gelf, assigned source host and coerced timestamp
# File lib/logstash/inputs/delf.rb, line 125 def self.new_event(json_gelf, host) event = parse(json_gelf) return if event.nil? event.set(SOURCE_HOST_FIELD, host) if (gelf_timestamp = event.get(TIMESTAMP_GELF_FIELD)).is_a?(Numeric) event.timestamp = self.coerce_timestamp(gelf_timestamp) event.remove(TIMESTAMP_GELF_FIELD) end event end
Public Instance Methods
# File lib/logstash/inputs/delf.rb, line 62 def register require 'gelfd' end
# File lib/logstash/inputs/delf.rb, line 67 def run(output_queue) begin # udp server udp_listener(output_queue) rescue => e unless stop? @logger.warn("delf listener died", :exception => e, :backtrace => e.backtrace) Stud.stoppable_sleep(RECONNECT_BACKOFF_SLEEP) { stop? } retry unless stop? end end # begin end
# File lib/logstash/inputs/delf.rb, line 81 def stop @udp.close rescue IOError # the plugin is currently shutting down, so its safe to ignore theses errors end
Private Instance Methods
# File lib/logstash/inputs/delf.rb, line 198 def handle_multiline(event) # Ignore if no track found track_id = event.get(@track) return event unless track_id.kind_of?(String) # Ignore if no message found message = event.get("message") return event unless message.kind_of?(String) # strip right message = message.rstrip # Fetch last event last_event = @incomplete_events[track_id] if message.end_with?(@continue_mark) # remove the continue_mark message = message.slice(0, message.length - @continue_mark.length) # If it's an incomplete event if last_event.nil? # update the message event.set("message", message) # cache it as a pending event @incomplete_events[track_id] = event return nil else # append content to pending event last_event.set("message", last_event.get("message") + "\r\n" + message) # limit message length to 5000 if last_event.get("message").length > @max_length @incomplete_events[track_id] = nil return last_event else return nil end end else # If it's not an incomplete event if last_event.nil? # just return if no pending incomplete event return event else # append content to pending incomplete event and return it last_event.set("message", last_event.get("message") + "\r\n" + message) # clear the pending incomplete event @incomplete_events[track_id] = nil return last_event end end end
# File lib/logstash/inputs/delf.rb, line 174 def remap_gelf(event) if event.get("full_message") && !event.get("full_message").empty? event.set("message", event.get("full_message").dup) event.remove("full_message") if event.get("short_message") == event.get("message") event.remove("short_message") end elsif event.get("short_message") && !event.get("short_message").empty? event.set("message", event.get("short_message").dup) event.remove("short_message") end end
# File lib/logstash/inputs/delf.rb, line 188 def strip_leading_underscore(event) # Map all '_foo' fields to simply 'foo' event.to_hash.keys.each do |key| next unless key[0,1] == "_" event.set(key[1..-1], event.get(key)) event.remove(key) end end
# File lib/logstash/inputs/delf.rb, line 87 def udp_listener(output_queue) @logger.info("Starting delf listener", :address => "#{@host}:#{@port}") @continue_mark = Base64.urlsafe_decode64(@continue_mark_base64) @udp = UDPSocket.new(Socket::AF_INET) @udp.bind(@host, @port) while !stop? line, client = @udp.recvfrom(8192) begin data = Gelfd::Parser.parse(line) rescue => ex @logger.warn("Gelfd failed to parse a message skipping", :exception => ex, :backtrace => ex.backtrace) next end # Gelfd parser outputs null if it received and cached a non-final chunk next if data.nil? event = self.class.new_event(data, client[3]) next if event.nil? remap_gelf(event) strip_leading_underscore(event) decorate(event) event = handle_multiline(event) next if event.nil? output_queue << event end end