class Fluent::LogitOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logit.rb, line 9
def initialize
  super
  require 'socket'
  require 'timeout'
  require 'fileutils'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logit.rb, line 29
def configure(conf)
  super
  if /[\w]{8}(-[\w]{4}){3}-[\w]{12}/.match(@stack_id) == nil
    raise "stack_id is required and must be a GUID. See the source wizard"
  end
  if @port == 0
    raise "port is required. See the source wizard."
  end

  if @tls_mode == :mutual
    if ! @tls_ca_certificate || @tls_ca_certificate.empty?
      raise Fluent::ConfigError, 
        "tls_ca_certificate is required when tls_mode is set to mutual"
    end

    if ! @tls_certificate || @tls_certificate.empty?
      raise Fluent::ConfigError,
        "tls_certificate is required when tls_mode is set to mutual"
    end

    if ! @tls_private_key || @tls_private_key.empty?
      raise Fluent::ConfigError,
        "tls_private_key is required when tls_mode is set to mutual"
    end
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_logit.rb, line 64
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logit.rb, line 60
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_logit.rb, line 56
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_logit.rb, line 68
def write(chunk)
  return if chunk.empty?

  send_data(chunk)
end

Private Instance Methods

connect() click to toggle source
# File lib/fluent/plugin/out_logit.rb, line 105
def connect()
  Timeout.timeout(@connect_timeout) do
    socket = TCPSocket.open("#{resolved_host()}", @port)
    ssl_context = OpenSSL::SSL::SSLContext.new()
    ssl_context.options |= OpenSSL::SSL::OP_NO_SSLv2
    ssl_context.options |= OpenSSL::SSL::OP_NO_SSLv3
    ssl_context.options |= OpenSSL::SSL::OP_NO_COMPRESSION
    ssl_context.ciphers = "TLSv1.2:!aNULL:!eNULL"
    ssl_context.ssl_version = :TLSv1_2

    if @tls_mode == :mutual
      ssl_context.cert = OpenSSL::X509::Certificate.new(File.open(@tls_certificate))
      ssl_context.key = OpenSSL::PKey::RSA.new(File.open(@tls_private_key))
      ssl_context.ca_file = @tls_ca_certificate
    end

    ssl_socket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context)
    ssl_socket.sync_close = true
    ssl_socket.connect

    return ssl_socket
  end
end
prepare_data_to_send(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_logit.rb, line 93
def prepare_data_to_send(tag, time, record)
  if @output_type == "json"
    new_line_suf = ""
    if @output_append_newline
      new_line_suf = "\n"
    end
    return "#{record.to_json}#{new_line_suf}"
  else
    return [tag, time, record].to_msgpack
  end
end
resolved_host() click to toggle source
# File lib/fluent/plugin/out_logit.rb, line 129
def resolved_host()
  @sockaddr = Socket.pack_sockaddr_in(@port, "#{@stack_id}-ls.logit.io")
  _, rhost = Socket.unpack_sockaddr_in(@sockaddr)
  return rhost
end
send_data(chunk) click to toggle source
# File lib/fluent/plugin/out_logit.rb, line 75
def send_data(chunk)
  sock = connect()
  begin
    opt = [1, @send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)

    opt = [@send_timeout.to_i, 0].pack('L!L!')  # struct timeval
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

    chunk.msgpack_each do |tag, time, record|
      next unless record.is_a? Hash
      sock.write(prepare_data_to_send(tag, time, record))
    end
  ensure
    sock.close
  end
end