class Fluent::TCPSocketClientInput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_tcp_socket_client.rb, line 18 def initialize super require 'socket' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_tcp_socket_client.rb, line 23 def configure(conf) super log.info "server has been set : #{@server}:#{@port}" case @format when 'json' require 'oj' when 'ltsv' require 'ltsv' when 'msgpack' require 'msgpack' end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/in_tcp_socket_client.rb, line 38 def multi_workers_ready? true end
parse_msg(record)
click to toggle source
# File lib/fluent/plugin/in_tcp_socket_client.rb, line 72 def parse_msg(record) parsed_record = {} case @format when 'json' parsed_record = Oj.load(record) when 'ltsv' parsed_record = LTSV.parse(record) when 'msgpack' parsed_record = MessagePack.unpack(record) when 'text' parsed_record["message"] = record end parsed_record end
read_socket_messages()
click to toggle source
# File lib/fluent/plugin/in_tcp_socket_client.rb, line 47 def read_socket_messages es = MultiEventStream.new log.trace "Creating socket to #{@server}:#{@port}" begin socket = TCPSocket.open(@server, @port) count=0 while message = socket.gets(@delimiter) es.add(Time.now.to_i, parse_msg(message.chomp(@delimiter))) count+=1 if (count % @emit_messages) == 0 unless es.empty? router.emit_stream(tag, es) end es = MultiEventStream.new end end rescue Exception => e $log.error e end unless es.empty? router.emit_stream(tag, es) end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_tcp_socket_client.rb, line 42 def start super timer_execute(:read_socket_run, @interval, repeat: true, &method(:read_socket_messages)) end