class Lumberjack::Beats::Connection
Constants
- READ_SIZE
Attributes
server[RW]
Public Class Methods
new(fd, server)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 314 def initialize(fd, server) @parser = Parser.new @fd = fd @server = server @ack_handler = nil end
Public Instance Methods
ack_if_needed(sequence, &block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 383 def ack_if_needed(sequence, &block) block.call send_ack(sequence) if @ack_handler.ack?(sequence) end
close()
click to toggle source
# File lib/lumberjack/beats/server.rb, line 388 def close @fd.close unless @fd.closed? end
closed?()
click to toggle source
# File lib/lumberjack/beats/server.rb, line 392 def closed? @fd.closed? end
data(map, &block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 396 def data(map, &block) block.call(map) if block_given? end
identity_stream(map)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 413 def identity_stream(map) id = map.fetch("beat", {})["id"] if id && map["resource_id"] identity_values = [id, map["resource_id"]] else identity_values = [map.fetch("beat", {})["name"], map["source"]] end identity_values.compact.join("-") end
peer()
click to toggle source
# File lib/lumberjack/beats/server.rb, line 322 def peer "#{@fd.peeraddr[3]}:#{@fd.peeraddr[1]}" end
read_socket(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 347 def read_socket(&block) # TODO(sissel): Ack on idle. # X: - if any unacked, IO.select # X: - on timeout, ack all. # X: Doing so will prevent slow streams from retransmitting # X: too many events after errors. @parser.feed(@fd.sysread(READ_SIZE)) do |event, *args| case event when :version version(*args) when :window_size reset_next_ack(*args) when :data sequence, map = args ack_if_needed(sequence) { data(map, &block) } when :json # If the payload is an array of items we will emit multiple events # this behavior was moved from the plugin to the library. # see this commit: https://github.com/logstash-plugins/logstash-input-lumberjack/pull/57/files#diff-1b9590423b15f04f215635164e7376ecR158 sequence, map = args ack_if_needed(sequence) do if map.is_a?(Array) map.each { |e| data(e, &block) } else data(map, &block) end end end end end
reset_next_ack(window_size)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 400 def reset_next_ack(window_size) klass = version_1? ? AckingProtocolV1 : AckingProtocolV2 @ack_handler = klass.new(window_size) end
run(&block)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 326 def run(&block) while !server.closed? read_socket(&block) end rescue EOFError, OpenSSL::SSL::SSLError, IOError, Errno::ECONNRESET, Errno::EPIPE, Errno::EBADF, Errno::EAGAIN # EOF or other read errors, only action is to shutdown which we'll do in # 'ensure' rescue # when the server is shutting down we can safely ignore any exceptions # On windows, we can get a `SystemCallErr` raise unless server.closed? ensure close rescue 'Already closed stream' end
send_ack(sequence)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 405 def send_ack(sequence) @fd.syswrite(@ack_handler.ack_frame(sequence)) end
version(version)
click to toggle source
# File lib/lumberjack/beats/server.rb, line 379 def version(version) @version = version end
version_1?()
click to toggle source
# File lib/lumberjack/beats/server.rb, line 409 def version_1? @version == Parser::PROTOCOL_VERSION_1 end