class Fluent::RawTcpOutput
Attributes
nodes[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 6 def initialize super require 'socket' require 'timeout' require 'fileutils' @sockets = [] #=> [Socket] end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 20 def configure(conf) super conf.elements.each do |e| next if e.name != "server" host = e['host'] port = e['port'] port = port ? port.to_i : DEFAULT_LISTEN_PORT name = e['name'] unless name name = "#{host}:#{port}" end sock = connect(RawNode.new(name, host, port)) opt = [1, @send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@send_timeout.to_i, 0].pack('L!L!') # struct timeval sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) @sockets << sock log.info "adding forwarding server '#{name}'", :host=>host, :port=>port end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 52 def format(tag, time, record) [tag, time, record].to_msgpack end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 48 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_rawtcp.rb, line 44 def start super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 56 def write(chunk) return if chunk.empty? error = nil @sockets.each do |sock| begin send_data(sock, chunk) return rescue error = $! end end raise error if error raise "No nodes available" end
Private Instance Methods
connect(node)
click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 96 def connect(node) Timeout.timeout(@connect_timeout) do return TCPSocket.new(node.resolved_host, node.port) end end
prepare_data_to_send(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 84 def prepare_data_to_send(tag, time, record) if @output_type == "json" new_line_suf = "" if @output_append_newline new_line_suf = "\n" end return "#{record.to_json}#{new_line_suf}" else return [tag, time, record.to_json].to_msgpack end end
send_data(sock, chunk)
click to toggle source
# File lib/fluent/plugin/out_rawtcp.rb, line 75 def send_data(sock, chunk) begin chunk.msgpack_each do |tag, time, record| next unless record.is_a? Hash sock.write(prepare_data_to_send(tag, time, record)) end end end