class LogStash::Inputs::Udp

Read messages as events over the network via udp. The only required configuration item is `port`, which specifies the udp port logstash will listen on for event streams.

Public Class Methods

new(params) click to toggle source
Calls superclass method
# File lib/logstash/inputs/udp.rb, line 48
def initialize(params)
  super
  BasicSocket.do_not_reverse_lookup = true
end

Public Instance Methods

close() click to toggle source
# File lib/logstash/inputs/udp.rb, line 94
def close
  if @udp && !@udp.closed?
    @udp.close rescue ignore_close_and_log($!)
  end
end
register() click to toggle source
# File lib/logstash/inputs/udp.rb, line 53
def register
  @udp = nil
  @metric_errors = metric.namespace(:errors)
  if source_ip_fieldname.nil?
    # define ecs name mapping
    @field_source_ip = ecs_select[disabled: "host", v1: "[host][ip]"]
  else
    @field_source_ip = source_ip_fieldname
    if (ecs_compatibility != :disabled)
      @logger.warn("'source_ip_fieldname' is user customized, please check is has an ECS compatible name ")
    end
  end
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/udp.rb, line 67
def run(output_queue)
  @output_queue = output_queue

  @input_to_worker = SizedQueue.new(@queue_size)
  metric.gauge(:queue_size, @queue_size)
  metric.gauge(:workers, @workers)

  @input_workers = (1..@workers).to_a.map do |i|
    @logger.debug("Starting UDP worker thread", :worker => i)
    Thread.new(i, @codec.clone) { |i, codec| inputworker(i, codec) }
  end

  begin
    # udp server
    udp_listener(output_queue)
  rescue => e
    @logger.error("UDP listener died", :exception => e, :backtrace => e.backtrace)
    @metric_errors.increment(:listener)
    Stud.stoppable_sleep(5) { stop? }
    retry unless stop?
  ensure
    # signal workers to end
    @input_workers.size.times { @input_to_worker.push([:END, nil]) }
    @input_workers.each { |thread| thread.join }
  end
end
stop() click to toggle source
# File lib/logstash/inputs/udp.rb, line 100
def stop
  if @udp && !@udp.closed?
    @udp.close rescue ignore_close_and_log($!)
  end
end

Private Instance Methods

ignore_close_and_log(e) click to toggle source
# File lib/logstash/inputs/udp.rb, line 195
def ignore_close_and_log(e)
  @logger.debug("ignoring close exception", "exception" => e)
end
inputworker(number, codec) click to toggle source
# File lib/logstash/inputs/udp.rb, line 153
def inputworker(number, codec)
  LogStash::Util::set_thread_name("<udp.#{number}")

  while true
    # a worker should never terminate from an exception, only when it receives the :END symbol
    begin
      payload, client = @input_to_worker.pop
      break if payload == :END

      ip_address = client[3]

      codec.decode(payload) { |event| push_decoded_event(ip_address, event) }
      codec.flush { |event| push_decoded_event(ip_address, event) }
    rescue => e
      @logger.error("Exception in inputworker", "exception" => e, "backtrace" => e.backtrace)
      @metric_errors.increment(:worker)
    end
  end
end
push_data(payload, client) click to toggle source
# File lib/logstash/inputs/udp.rb, line 178
def push_data(payload, client)
  payload = payload.b.force_encoding(Encoding::UTF_8)
  @input_to_worker.push([payload, client])
end
push_decoded_event(ip_address, event) click to toggle source
# File lib/logstash/inputs/udp.rb, line 188
def push_decoded_event(ip_address, event)
  decorate(event)
  event.set(@field_source_ip, ip_address) if event.get(@field_source_ip).nil?
  @output_queue.push(event)
  metric.increment(:events)
end
udp_listener(output_queue) click to toggle source
# File lib/logstash/inputs/udp.rb, line 108
def udp_listener(output_queue)
  @logger.info("Starting UDP listener", :address => "#{@host}:#{@port}")

  if @udp && !@udp.closed?
    @udp.close
  end

  if IPAddr.new(@host).ipv6?
    @udp = UDPSocket.new(Socket::AF_INET6)
  elsif IPAddr.new(@host).ipv4?
    @udp = UDPSocket.new(Socket::AF_INET)
  end
  # set socket receive buffer size if configured
  if @receive_buffer_bytes
    @udp.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, @receive_buffer_bytes)
  end
  rcvbuf = @udp.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i")[0]
  if @receive_buffer_bytes && rcvbuf != @receive_buffer_bytes
    @logger.warn("Unable to set receive_buffer_bytes to desired size. Requested #{@receive_buffer_bytes} but obtained #{rcvbuf} bytes.")
  end

  @udp.bind(@host, @port)
  @logger.info("UDP listener started", :address => "#{@host}:#{@port}", :receive_buffer_bytes => "#{rcvbuf}", :queue_size => "#{@queue_size}")


  while !stop?
    next if IO.select([@udp], [], [], 0.5).nil?
    # collect datagram messages and add to inputworker queue
    @queue_size.times do
      begin
        payload, client = @udp.recvfrom_nonblock(@buffer_size)
        break if payload.empty?
        push_data(payload, client)
      rescue IO::EAGAINWaitReadable
        break
      end
    end
  end
ensure
  if @udp
    @udp.close_read rescue ignore_close_and_log($!)
    @udp.close_write rescue ignore_close_and_log($!)
  end
end