class Fluent::RawTcpOutput

Attributes

nodes[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 6
def initialize
  super
  require 'socket'
  require 'timeout'
  require 'fileutils'
  @sockets = []  #=> [Socket]
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 20
def configure(conf)
  super

  conf.elements.each do |e|
    next if e.name != "server"

    host = e['host']
    port = e['port']
    port = port ? port.to_i : DEFAULT_LISTEN_PORT

    name = e['name']
    unless name
      name = "#{host}:#{port}"
    end
    sock = connect(RawNode.new(name, host, port))
    opt = [1, @send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
    opt = [@send_timeout.to_i, 0].pack('L!L!')  # struct timeval
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)
    @sockets << sock
    log.info "adding forwarding server '#{name}'", :host=>host, :port=>port
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 52
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 48
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 44
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 56
def write(chunk)
  return if chunk.empty?

  error = nil

  @sockets.each do |sock|
    begin
      send_data(sock, chunk)
      return
    rescue
      error = $!
    end
  end

  raise error if error
  raise "No nodes available"
end

Private Instance Methods

connect(node) click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 96
def connect(node)
  Timeout.timeout(@connect_timeout) do
    return TCPSocket.new(node.resolved_host, node.port)
  end
end
prepare_data_to_send(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 84
def prepare_data_to_send(tag, time, record)
  if @output_type == "json"
    new_line_suf = ""
    if @output_append_newline
      new_line_suf = "\n"
    end
    return "#{record.to_json}#{new_line_suf}"
  else
    return [tag, time, record.to_json].to_msgpack
  end
end
send_data(sock, chunk) click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 75
def send_data(sock, chunk)
  begin
    chunk.msgpack_each do |tag, time, record|
      next unless record.is_a? Hash
      sock.write(prepare_data_to_send(tag, time, record))
    end
  end
end