class Lumberjack::Beats::Socket

Attributes

host[R]
sequence[R]

Create a new Lumberjack Socket.

  • options is a hash. Valid options are:

  • :port - the port to listen on

  • :address - the host/address to bind to

  • :ssl - enable/disable ssl support

  • :ssl_certificate - the path to the ssl cert to use.

    If ssl_certificate is not set, a plain tcp connection
    will be used.

Public Class Methods

new(opts={}) click to toggle source
# File lib/lumberjack/beats/client.rb, line 71
def initialize(opts={})
  @sequence = 0
  @last_ack = 0
  @opts = {
    :port => 0,
    :address => "127.0.0.1",
    :ssl_certificate_authorities => [], # use the same naming as beats' TLS options
    :ssl_certificate => nil,
    :ssl_certificate_key => nil,
    :ssl_certificate_password => nil,
    :ssl => true,
    :json => false,
  }.merge(opts)
  @host = @opts[:address]

  connection_start
end

Public Instance Methods

write_sync(elements, opts={}) click to toggle source
# File lib/lumberjack/beats/client.rb, line 170
def write_sync(elements, opts={})
  options = {
    :json => @opts[:json],
  }.merge(opts)

  elements = [elements] if elements.is_a?(Hash)
  send_window_size(elements.size)

  encoder = options[:json] ? JsonEncoder : FrameEncoder
  payload = elements.map { |element| encoder.to_frame(element, inc) }.join
  compress = compress_payload(payload)
  send_payload(compress)

  ack(elements.size)
end

Private Instance Methods

ack(size) click to toggle source
# File lib/lumberjack/beats/client.rb, line 193
def ack(size)
  _, type = read_version_and_type
  raise "Whoa we shouldn't get this frame: #{type}" if type != "A"
  @last_ack = read_last_ack
end
certificate() click to toggle source
# File lib/lumberjack/beats/client.rb, line 114
def certificate
  if @opts[:ssl_certificate]
    OpenSSL::X509::Certificate.new(File.open(@opts[:ssl_certificate]))
  end
end
compress_payload(payload) click to toggle source
# File lib/lumberjack/beats/client.rb, line 187
def compress_payload(payload)
  compress = Zlib::Deflate.deflate(payload)
  ["1", "C", compress.bytesize, compress].pack("AANA*")
end
connection_start() click to toggle source
# File lib/lumberjack/beats/client.rb, line 90
def connection_start
  tcp_socket = TCPSocket.new(@opts[:address], @opts[:port])

  if !@opts[:ssl]
    @socket = tcp_socket
  else

    @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, setup_ssl)
    @socket.connect
  end
end
inc() click to toggle source
# File lib/lumberjack/beats/client.rb, line 149
def inc
  @sequence = 0 if @sequence + 1 > Lumberjack::Beats::SEQUENCE_MAX
  @sequence = @sequence + 1
end
private_key() click to toggle source
# File lib/lumberjack/beats/client.rb, line 121
def private_key
  OpenSSL::PKey::RSA.new(File.read(@opts[:ssl_certificate_key]), @opts[:ssl_certificate_password]) if @opts[:ssl_certificate_key]
end
read_last_ack() click to toggle source
# File lib/lumberjack/beats/client.rb, line 212
def read_last_ack
  @socket.read(4).unpack("N").first
end
read_version_and_type() click to toggle source
# File lib/lumberjack/beats/client.rb, line 205
def read_version_and_type
  version = @socket.read(1)
  type    = @socket.read(1)
  [version, type]
end
send_payload(payload) click to toggle source
# File lib/lumberjack/beats/client.rb, line 160
def send_payload(payload)
  # SSLSocket has a limit of 16k per message
  # execute multiple writes if needed
  bytes_written = 0
  while bytes_written < payload.bytesize
    bytes_written += @socket.syswrite(payload.byteslice(bytes_written..-1))
  end
end
send_window_size(size) click to toggle source
# File lib/lumberjack/beats/client.rb, line 155
def send_window_size(size)
  @socket.syswrite(["1", "W", size].pack("AAN"))
end
setup_ssl() click to toggle source
# File lib/lumberjack/beats/client.rb, line 103
def setup_ssl
  ssl_context = OpenSSL::SSL::SSLContext.new

  ssl_context.cert = certificate
  ssl_context.key = private_key 
  ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
  ssl_context.cert_store = trust_store
  ssl_context
end
trust_store() click to toggle source
# File lib/lumberjack/beats/client.rb, line 126
def trust_store
  store = OpenSSL::X509::Store.new

  Array(@opts[:ssl_certificate_authorities]).each do |certificate_authority|
    if File.file?(certificate_authority)
      store.add_file(certificate_authority)
    else
      # add_path is no implemented under jruby
      # so recursively try to load all the certificate from this directory
      # https://github.com/jruby/jruby-openssl/blob/master/src/main/java/org/jruby/ext/openssl/X509Store.java#L159
      if !!(RUBY_PLATFORM == "java") 
        Dir.glob(File.join(certificate_authority, "**", "*")).each { |f| store.add_file(f) }
      else
        store.add_path(certificate_authority)
      end
    end
  end

  store
end
unacked_sequence_size() click to toggle source
# File lib/lumberjack/beats/client.rb, line 200
def unacked_sequence_size
  sequence - (@last_ack + 1)
end