class Fluent::Tcp_mcOutput

Attributes

nodes[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_tcp_mc.rb, line 26
def initialize
  super
  require 'socket'
  require 'timeout'
  require 'fileutils'
  @nodes = []  #=> [Node]
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_tcp_mc.rb, line 39
def configure(conf)
  super

  compat_parameters_convert(conf, :buffer, :formatter, :inject)

  conf.elements.each do |e|
    next if e.name != "server"

    host = e['host']
    port = e['port']
    port = port ? port.to_i : DEFAULT_LISTEN_PORT

    name = e['name']
    unless name
      name = "#{host}:#{port}"
    end

    @formatter = formatter_create

    @nodes << RawNode.new(name, host, port)
    log.info "adding forwarding server '#{name}'", :host=>host, :port=>port
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_tcp_mc.rb, line 75
def format(tag, time, record)
   r = inject_values_to_record(tag, time, record)
  [tag, time, r].to_msgpack
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_tcp_mc.rb, line 63
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_tcp_mc.rb, line 71
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_tcp_mc.rb, line 67
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_tcp_mc.rb, line 80
def write(chunk)
  return if chunk.empty?

  error = nil

  @nodes.each do |node|
    begin
      send_data(node, chunk)
      return
    rescue
      error = $!
    end
  end

  raise error if error
  raise "No nodes available"
end

Private Instance Methods

connect(node) click to toggle source
# File lib/fluent/plugin/out_tcp_mc.rb, line 118
def connect(node)
  Timeout.timeout(@connect_timeout) do
    return TCPSocket.new(node.resolved_host, node.port)
  end
end
send_data(node, chunk) click to toggle source
# File lib/fluent/plugin/out_tcp_mc.rb, line 99
def send_data(node, chunk)
  sock = connect(node)
  begin
    opt = [1, @send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)

    opt = [@send_timeout.to_i, 0].pack('L!L!')  # struct timeval
    sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

    chunk.msgpack_each do |tag, time, record|
      next unless record.is_a? Hash
      #log.debug ("#{record}")
      sock.write("#{Yajl.dump(record)}\n")
    end
  ensure
    sock.close
  end
end