class Fluent::TCPSocketClientInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_tcp_socket_client.rb, line 18
def initialize
  super
  require 'socket'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_tcp_socket_client.rb, line 23
def configure(conf)
  super

  log.info "server has been set : #{@server}:#{@port}"

  case @format
  when 'json'
    require 'oj'
  when 'ltsv'
    require 'ltsv'
  when 'msgpack'
    require 'msgpack'
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_tcp_socket_client.rb, line 38
def multi_workers_ready?
  true
end
parse_msg(record) click to toggle source
# File lib/fluent/plugin/in_tcp_socket_client.rb, line 72
def parse_msg(record)
  parsed_record = {}
  case @format
  when 'json'
    parsed_record = Oj.load(record)
  when 'ltsv'
    parsed_record = LTSV.parse(record)
  when 'msgpack'
    parsed_record = MessagePack.unpack(record)
  when 'text'
    parsed_record["message"] = record
  end
  parsed_record
end
read_socket_messages() click to toggle source
# File lib/fluent/plugin/in_tcp_socket_client.rb, line 47
def read_socket_messages
   es = MultiEventStream.new
   log.trace "Creating socket to #{@server}:#{@port}"
   begin
    socket = TCPSocket.open(@server, @port)
    count=0
    while message = socket.gets(@delimiter)
      es.add(Time.now.to_i, parse_msg(message.chomp(@delimiter)))
      count+=1
      if (count % @emit_messages) == 0
        unless es.empty?
          router.emit_stream(tag, es)
        end
        es = MultiEventStream.new
      end
    end
   rescue Exception => e
     $log.error e
   end

    unless es.empty?
       router.emit_stream(tag, es)
    end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_tcp_socket_client.rb, line 42
def start
  super
  timer_execute(:read_socket_run, @interval, repeat: true, &method(:read_socket_messages))
end