class LogStash::Inputs::BeatsSupport::ConnectionHandler
Handle the data coming from a connection Decide which Process should be used to decode the data coming from the beat library.
-
Should we use a codec on specific field?
-
Should we just take the raw content of the parsed json frame
Public Class Methods
new(connection, input, queue)
click to toggle source
# File lib/logstash/inputs/beats_support/connection_handler.rb, line 15 def initialize(connection, input, queue) @connection = connection @input = input @queue = queue @logger = input.logger # We need to clone the codec per connection, so we can flush a specific # codec when a connection is closed. @codec = input.codec.dup @nocodec_transformer = RawEventTransform.new(@input) @codec_transformer = DecodedEventTransform.new(@input) end
Public Instance Methods
accept()
click to toggle source
# File lib/logstash/inputs/beats_support/connection_handler.rb, line 30 def accept @logger.debug("Beats input: waiting from new events from remote host", :peer => @connection.peer) @connection.run { |hash, identity_stream| process(hash, identity_stream) } end
flush(&block)
click to toggle source
OOB call to flush the codec buffer,
This method is a bit tricky to decide when to be called, in the current case, this will be call on any exception raised, either is a circuit breaker or the remote host closed the connection, its better to make sure we clear their data and create duplicates then losing the data.
# File lib/logstash/inputs/beats_support/connection_handler.rb, line 77 def flush(&block) @logger.debug? && @logger.debug("Beats input, out of band call for flushing the content of this connection", :peer => @connection.peer) @codec.flush(&block) end
process(hash, identity_stream)
click to toggle source
# File lib/logstash/inputs/beats_support/connection_handler.rb, line 37 def process(hash, identity_stream) @logger.debug? && @logger.debug("Beats input: new event received", :event_hash => hash, :identity_stream => identity_stream, :peer => @connection.peer) # Filebeats uses the `message` key and LSF `line` target_field = if from_filebeat?(hash) hash.delete(Lumberjack::Beats::FILEBEAT_LOG_LINE_FIELD) elsif from_logstash_forwarder?(hash) hash.delete(Lumberjack::Beats::LSF_LOG_LINE_FIELD) end if target_field.nil? @logger.debug? && @logger.debug("Beats input: not using the codec for this event, can't find the codec target field", :target_field_for_codec => @input.target_field_for_codec, :event_hash => hash) event = LogStash::Event.new(hash) @nocodec_transformer.transform(event) raise LogStash::Inputs::Beats::InsertingToQueueTakeTooLong if !@queue.offer(event) else @logger.debug? && @logger.debug("Beats input: decoding this event with the codec", :target_field_value => target_field) @codec.accept(CodecCallbackListener.new(target_field, hash, identity_stream, @codec_transformer, @queue)) end end
Private Instance Methods
from_filebeat?(hash)
click to toggle source
# File lib/logstash/inputs/beats_support/connection_handler.rb, line 85 def from_filebeat?(hash) !hash[Lumberjack::Beats::FILEBEAT_LOG_LINE_FIELD].nil? end
from_logstash_forwarder?(hash)
click to toggle source
# File lib/logstash/inputs/beats_support/connection_handler.rb, line 89 def from_logstash_forwarder?(hash) !hash[Lumberjack::Beats::LSF_LOG_LINE_FIELD].nil? end