class LogStash::Inputs::BeatsSupport::ConnectionHandler

Handle the data coming from a connection Decide which Process should be used to decode the data coming from the beat library.

Public Class Methods

new(connection, input, queue) click to toggle source
# File lib/logstash/inputs/beats_support/connection_handler.rb, line 15
def initialize(connection, input, queue)
  @connection = connection

  @input = input
  @queue = queue
  @logger = input.logger

  # We need to clone the codec per connection, so we can flush a specific
  # codec when a connection is closed.
  @codec = input.codec.dup

  @nocodec_transformer = RawEventTransform.new(@input)
  @codec_transformer = DecodedEventTransform.new(@input)
end

Public Instance Methods

accept() click to toggle source
# File lib/logstash/inputs/beats_support/connection_handler.rb, line 30
def accept
  @logger.debug("Beats input: waiting from new events from remote host",
                :peer => @connection.peer)

  @connection.run { |hash, identity_stream| process(hash, identity_stream) }
end
flush(&block) click to toggle source

OOB call to flush the codec buffer,

This method is a bit tricky to decide when to be called, in the current case, this will be call on any exception raised, either is a circuit breaker or the remote host closed the connection, its better to make sure we clear their data and create duplicates then losing the data.

# File lib/logstash/inputs/beats_support/connection_handler.rb, line 77
def flush(&block)
  @logger.debug? && @logger.debug("Beats input, out of band call for flushing the content of this connection",
                                  :peer => @connection.peer)

  @codec.flush(&block)
end
process(hash, identity_stream) click to toggle source
# File lib/logstash/inputs/beats_support/connection_handler.rb, line 37
def process(hash, identity_stream)
  @logger.debug? && @logger.debug("Beats input: new event received",
                                  :event_hash => hash,
                                  :identity_stream => identity_stream,
                                  :peer => @connection.peer)

  # Filebeats uses the `message` key and LSF `line`
  target_field = if from_filebeat?(hash)
                   hash.delete(Lumberjack::Beats::FILEBEAT_LOG_LINE_FIELD)
                 elsif from_logstash_forwarder?(hash)
                   hash.delete(Lumberjack::Beats::LSF_LOG_LINE_FIELD)
                 end

  if target_field.nil?
    @logger.debug? && @logger.debug("Beats input: not using the codec for this event, can't find the codec target field",
                                    :target_field_for_codec => @input.target_field_for_codec,
                                    :event_hash => hash)

    event = LogStash::Event.new(hash)
    @nocodec_transformer.transform(event)

    raise LogStash::Inputs::Beats::InsertingToQueueTakeTooLong if !@queue.offer(event)
  else
    @logger.debug? && @logger.debug("Beats input: decoding this event with the codec",
                                    :target_field_value =>  target_field)

    @codec.accept(CodecCallbackListener.new(target_field,
                                            hash,
                                            identity_stream,
                                            @codec_transformer,
                                            @queue))
  end
end

Private Instance Methods

from_filebeat?(hash) click to toggle source
# File lib/logstash/inputs/beats_support/connection_handler.rb, line 85
def from_filebeat?(hash)
  !hash[Lumberjack::Beats::FILEBEAT_LOG_LINE_FIELD].nil?
end
from_logstash_forwarder?(hash) click to toggle source
# File lib/logstash/inputs/beats_support/connection_handler.rb, line 89
def from_logstash_forwarder?(hash)
  !hash[Lumberjack::Beats::LSF_LOG_LINE_FIELD].nil?
end