class Fluent::UdpEventInput
Fluentd UDP input main class
Constants
- MAX_BLOCKTIME
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_udp_event.rb, line 14 def initialize super require 'fluent/plugin/socket_util' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_udp_event.rb, line 23 def configure(conf) super end
run()
click to toggle source
# File lib/fluent/plugin/in_udp_event.rb, line 66 def run @loop.run rescue Exception => e $log.error 'unexpected error', error: e.message $log.error_backtrace end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_udp_event.rb, line 42 def shutdown # Force event every MAX_BLOCKTIME seconds to prevent 60 second timeout # see ext/libev/ev.c MAX_BLOCKTIME @watcher = Cool.io::TimerWatcher.new(MAX_BLOCKTIME, true) @loop.attach(@watcher) $log.debug 'stopping event loop' begin @loop.stop rescue RuntimeError end $log.debug "closing udp socket on #{@bind}:#{@port}" @handler.close $log.debug 'closing watchers' @loop.watchers.each { |w| w.detach } $log.debug 'waiting for thread to finish' @thread.join $log.debug 'thread finished' $log.debug 'terminating' end
start()
click to toggle source
# File lib/fluent/plugin/in_udp_event.rb, line 27 def start callback = method(:receive_data) @loop = Coolio::Loop.new $log.debug "listening udp socket on #{@bind}:#{@port}" @usock = SocketUtil.create_udp_socket(@bind) @usock.bind(@bind, @port) @handler = UdpHandler.new(@usock, @max_message_size, callback) @loop.attach(@handler) @thread = Thread.new(&method(:run)) end
Protected Instance Methods
receive_data(data)
click to toggle source
# File lib/fluent/plugin/in_udp_event.rb, line 75 def receive_data(data) if data.bytesize == @max_message_size $log.warn "message might be too big and truncated to #{@max_message_size}" end begin parsed = JSON.parse(data) rescue JSON::ParserError => e $log.warn 'invalid json data', error: e.message return end tag = parsed[0] time = parsed[1].to_i record = parsed[2] if tag.nil? || time.nil? || record.nil? $log.warn "invalid message supplied #{data}" return end time ||= Engine.now Engine.emit(tag, time, record) rescue Exception => e $log.warn data.dump, error: e.message $log.debug_backtrace end