class Lumberjack::Beats::Connection
Constants
- PEER_INFORMATION_NOT_AVAILABLE
- READ_SIZE
- RESCUED_CONNECTION_EXCEPTIONS
Attributes
peer[R]
server[RW]
Public Class Methods
new(fd, server)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 422 def initialize(fd, server) @parser = Parser.new @fd = fd @server = server @ack_handler = nil # Fetch the details of the host before reading anything from the socket # se we can use that information when debugging connection issues with # remote hosts. begin @peer = "#{@fd.peeraddr[3]}:#{@fd.peeraddr[1]}" rescue IOError # This can happen if the connection is drop or close before # fetching the host details, lets return a generic string. @peer = PEER_INFORMATION_NOT_AVAILABLE end end
Public Instance Methods
ack_if_needed(sequence, &block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 502 def ack_if_needed(sequence, &block) block.call send_ack(sequence) if @ack_handler.ack?(sequence) end
close()
click to toggle source
# File lib/lumberjack/beats/server.rb, line 507 def close @fd.close unless @fd.closed? end
data(map, &block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 511 def data(map, &block) block.call(map, identity_stream(map)) if block_given? end
identity_stream(map)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 528 def identity_stream(map) id = map.fetch("beat", {})["id"] if id && map["resource_id"] identity_values = [id, map["resource_id"]] else identity_values = [map.fetch("beat", {})["name"], map["source"]] end identity_values.compact.join("-") end
normalize_v1_metadata_encoding(map)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 489 def normalize_v1_metadata_encoding(map) # lets normalize the metadata of the v1 frame to make # sure everything is in utf-8 format, because LSF don't enforce the encoding when he send # the data to the server. Path, offset can be in another encoding, when the data is assigned to the event. # the event will validate it and crash when the encoding is in the wrong format. map.each { |k, v| map[k].force_encoding(Encoding::UTF_8) unless k == Lumberjack::Beats::LSF_LOG_LINE_FIELD } map end
read_socket(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 457 def read_socket(&block) # TODO(sissel): Ack on idle. # X: - if any unacked, IO.select # X: - on timeout, ack all. # X: Doing so will prevent slow streams from retransmitting # X: too many events after errors. @parser.feed(@fd.sysread(READ_SIZE)) do |event, *args| case event when :version version(*args) when :window_size reset_next_ack(*args) when :data sequence, map = args ack_if_needed(sequence) { data(normalize_v1_metadata_encoding(map), &block) } when :json # If the payload is an array of items we will emit multiple events # this behavior was moved from the plugin to the library. # see this commit: https://github.com/logstash-plugins/logstash-input-lumberjack/pull/57/files#diff-1b9590423b15f04f215635164e7376ecR158 sequence, map = args ack_if_needed(sequence) do if map.is_a?(Array) map.each { |e| data(e, &block) } else data(map, &block) end end end end end
reset_next_ack(window_size)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 515 def reset_next_ack(window_size) klass = version_1? ? AckingProtocolV1 : AckingProtocolV2 @ack_handler = klass.new(window_size) end
run(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 441 def run(&block) while !server.closed? read_socket(&block) end rescue *RESCUED_CONNECTION_EXCEPTIONS => e # EOF or other read errors, only action is to shutdown which we'll do in # 'ensure' raise ConnectionClosed.new(e) rescue # when the server is shutting down we can safely ignore any exceptions # On windows, we can get a `SystemCallErr` raise unless server.closed? ensure close rescue 'Already closed stream' end
send_ack(sequence)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 520 def send_ack(sequence) @fd.syswrite(@ack_handler.ack_frame(sequence)) end
version(version)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 498 def version(version) @version = version end
version_1?()
click to toggle source
# File lib/lumberjack/beats/server.rb, line 524 def version_1? @version == Parser::PROTOCOL_VERSION_1 end