class Lumberjack::Beats::Socket
Attributes
host[R]
sequence[R]
Create a new Lumberjack
Socket
.
-
options is a hash. Valid options are:
-
:port - the port to listen on
-
:address - the host/address to bind to
-
:ssl - enable/disable ssl support
-
:ssl_certificate - the path to the ssl cert to use.
If ssl_certificate is not set, a plain tcp connection will be used.
Public Class Methods
new(opts={})
click to toggle source
# File lib/lumberjack/beats/client.rb, line 71 def initialize(opts={}) @sequence = 0 @last_ack = 0 @opts = { :port => 0, :address => "127.0.0.1", :ssl_certificate_authorities => [], # use the same naming as beats' TLS options :ssl_certificate => nil, :ssl_certificate_key => nil, :ssl_certificate_password => nil, :ssl => true, :json => false, }.merge(opts) @host = @opts[:address] connection_start end
Public Instance Methods
write_sync(elements, opts={})
click to toggle source
# File lib/lumberjack/beats/client.rb, line 170 def write_sync(elements, opts={}) options = { :json => @opts[:json], }.merge(opts) elements = [elements] if elements.is_a?(Hash) send_window_size(elements.size) encoder = options[:json] ? JsonEncoder : FrameEncoder payload = elements.map { |element| encoder.to_frame(element, inc) }.join compress = compress_payload(payload) send_payload(compress) ack(elements.size) end
Private Instance Methods
ack(size)
click to toggle source
# File lib/lumberjack/beats/client.rb, line 193 def ack(size) _, type = read_version_and_type raise "Whoa we shouldn't get this frame: #{type}" if type != "A" @last_ack = read_last_ack end
certificate()
click to toggle source
# File lib/lumberjack/beats/client.rb, line 114 def certificate if @opts[:ssl_certificate] OpenSSL::X509::Certificate.new(File.open(@opts[:ssl_certificate])) end end
compress_payload(payload)
click to toggle source
# File lib/lumberjack/beats/client.rb, line 187 def compress_payload(payload) compress = Zlib::Deflate.deflate(payload) ["1", "C", compress.bytesize, compress].pack("AANA*") end
connection_start()
click to toggle source
# File lib/lumberjack/beats/client.rb, line 90 def connection_start tcp_socket = TCPSocket.new(@opts[:address], @opts[:port]) if !@opts[:ssl] @socket = tcp_socket else @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, setup_ssl) @socket.connect end end
inc()
click to toggle source
# File lib/lumberjack/beats/client.rb, line 149 def inc @sequence = 0 if @sequence + 1 > Lumberjack::Beats::SEQUENCE_MAX @sequence = @sequence + 1 end
private_key()
click to toggle source
# File lib/lumberjack/beats/client.rb, line 121 def private_key OpenSSL::PKey::RSA.new(File.read(@opts[:ssl_certificate_key]), @opts[:ssl_certificate_password]) if @opts[:ssl_certificate_key] end
read_last_ack()
click to toggle source
# File lib/lumberjack/beats/client.rb, line 212 def read_last_ack @socket.read(4).unpack("N").first end
read_version_and_type()
click to toggle source
# File lib/lumberjack/beats/client.rb, line 205 def read_version_and_type version = @socket.read(1) type = @socket.read(1) [version, type] end
send_payload(payload)
click to toggle source
# File lib/lumberjack/beats/client.rb, line 160 def send_payload(payload) # SSLSocket has a limit of 16k per message # execute multiple writes if needed bytes_written = 0 while bytes_written < payload.bytesize bytes_written += @socket.syswrite(payload.byteslice(bytes_written..-1)) end end
send_window_size(size)
click to toggle source
# File lib/lumberjack/beats/client.rb, line 155 def send_window_size(size) @socket.syswrite(["1", "W", size].pack("AAN")) end
setup_ssl()
click to toggle source
# File lib/lumberjack/beats/client.rb, line 103 def setup_ssl ssl_context = OpenSSL::SSL::SSLContext.new ssl_context.cert = certificate ssl_context.key = private_key ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER ssl_context.cert_store = trust_store ssl_context end
trust_store()
click to toggle source
# File lib/lumberjack/beats/client.rb, line 126 def trust_store store = OpenSSL::X509::Store.new Array(@opts[:ssl_certificate_authorities]).each do |certificate_authority| if File.file?(certificate_authority) store.add_file(certificate_authority) else # add_path is no implemented under jruby # so recursively try to load all the certificate from this directory # https://github.com/jruby/jruby-openssl/blob/master/src/main/java/org/jruby/ext/openssl/X509Store.java#L159 if !!(RUBY_PLATFORM == "java") Dir.glob(File.join(certificate_authority, "**", "*")).each { |f| store.add_file(f) } else store.add_path(certificate_authority) end end end store end
unacked_sequence_size()
click to toggle source
# File lib/lumberjack/beats/client.rb, line 200 def unacked_sequence_size sequence - (@last_ack + 1) end