class Fluent::UdpEventInput

Fluentd UDP input main class

Constants

MAX_BLOCKTIME

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_udp_event.rb, line 14
def initialize
  super
  require 'fluent/plugin/socket_util'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_udp_event.rb, line 23
def configure(conf)
  super
end
run() click to toggle source
# File lib/fluent/plugin/in_udp_event.rb, line 66
def run
  @loop.run
rescue Exception => e
  $log.error 'unexpected error', error: e.message
  $log.error_backtrace
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_udp_event.rb, line 42
def shutdown
  # Force event every MAX_BLOCKTIME seconds to prevent 60 second timeout
  # see ext/libev/ev.c MAX_BLOCKTIME
  @watcher = Cool.io::TimerWatcher.new(MAX_BLOCKTIME, true)
  @loop.attach(@watcher)

  $log.debug 'stopping event loop'
  begin
    @loop.stop
  rescue RuntimeError
  end

  $log.debug "closing udp socket on #{@bind}:#{@port}"
  @handler.close

  $log.debug 'closing watchers'
  @loop.watchers.each { |w| w.detach }

  $log.debug 'waiting for thread to finish'
  @thread.join
  $log.debug 'thread finished'
  $log.debug 'terminating'
end
start() click to toggle source
# File lib/fluent/plugin/in_udp_event.rb, line 27
def start
  callback = method(:receive_data)

  @loop = Coolio::Loop.new

  $log.debug "listening udp socket on #{@bind}:#{@port}"
  @usock = SocketUtil.create_udp_socket(@bind)
  @usock.bind(@bind, @port)

  @handler = UdpHandler.new(@usock, @max_message_size, callback)
  @loop.attach(@handler)

  @thread = Thread.new(&method(:run))
end

Protected Instance Methods

receive_data(data) click to toggle source
# File lib/fluent/plugin/in_udp_event.rb, line 75
def receive_data(data)
  if data.bytesize == @max_message_size
    $log.warn "message might be too big and truncated to #{@max_message_size}"
  end

  begin
    parsed = JSON.parse(data)
  rescue JSON::ParserError => e
    $log.warn 'invalid json data', error: e.message
    return
  end

  tag = parsed[0]
  time = parsed[1].to_i
  record = parsed[2]

  if tag.nil? || time.nil? || record.nil?
    $log.warn "invalid message supplied #{data}"
    return
  end

  time ||= Engine.now

  Engine.emit(tag, time, record)
rescue Exception => e
  $log.warn data.dump, error: e.message
  $log.debug_backtrace
end