class LogStash::Inputs::Udp
Read messages as events over the network via udp. The only required configuration item is `port`, which specifies the udp port logstash will listen on for event streams.
Public Class Methods
new(params)
click to toggle source
Calls superclass method
# File lib/logstash/inputs/udp.rb, line 48 def initialize(params) super BasicSocket.do_not_reverse_lookup = true end
Public Instance Methods
close()
click to toggle source
# File lib/logstash/inputs/udp.rb, line 94 def close if @udp && !@udp.closed? @udp.close rescue ignore_close_and_log($!) end end
register()
click to toggle source
# File lib/logstash/inputs/udp.rb, line 53 def register @udp = nil @metric_errors = metric.namespace(:errors) if source_ip_fieldname.nil? # define ecs name mapping @field_source_ip = ecs_select[disabled: "host", v1: "[host][ip]"] else @field_source_ip = source_ip_fieldname if (ecs_compatibility != :disabled) @logger.warn("'source_ip_fieldname' is user customized, please check is has an ECS compatible name ") end end end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/udp.rb, line 67 def run(output_queue) @output_queue = output_queue @input_to_worker = SizedQueue.new(@queue_size) metric.gauge(:queue_size, @queue_size) metric.gauge(:workers, @workers) @input_workers = (1..@workers).to_a.map do |i| @logger.debug("Starting UDP worker thread", :worker => i) Thread.new(i, @codec.clone) { |i, codec| inputworker(i, codec) } end begin # udp server udp_listener(output_queue) rescue => e @logger.error("UDP listener died", :exception => e, :backtrace => e.backtrace) @metric_errors.increment(:listener) Stud.stoppable_sleep(5) { stop? } retry unless stop? ensure # signal workers to end @input_workers.size.times { @input_to_worker.push([:END, nil]) } @input_workers.each { |thread| thread.join } end end
stop()
click to toggle source
# File lib/logstash/inputs/udp.rb, line 100 def stop if @udp && !@udp.closed? @udp.close rescue ignore_close_and_log($!) end end
Private Instance Methods
ignore_close_and_log(e)
click to toggle source
# File lib/logstash/inputs/udp.rb, line 195 def ignore_close_and_log(e) @logger.debug("ignoring close exception", "exception" => e) end
inputworker(number, codec)
click to toggle source
# File lib/logstash/inputs/udp.rb, line 153 def inputworker(number, codec) LogStash::Util::set_thread_name("<udp.#{number}") while true # a worker should never terminate from an exception, only when it receives the :END symbol begin payload, client = @input_to_worker.pop break if payload == :END ip_address = client[3] codec.decode(payload) { |event| push_decoded_event(ip_address, event) } codec.flush { |event| push_decoded_event(ip_address, event) } rescue => e @logger.error("Exception in inputworker", "exception" => e, "backtrace" => e.backtrace) @metric_errors.increment(:worker) end end end
push_data(payload, client)
click to toggle source
# File lib/logstash/inputs/udp.rb, line 178 def push_data(payload, client) payload = payload.b.force_encoding(Encoding::UTF_8) @input_to_worker.push([payload, client]) end
push_decoded_event(ip_address, event)
click to toggle source
# File lib/logstash/inputs/udp.rb, line 188 def push_decoded_event(ip_address, event) decorate(event) event.set(@field_source_ip, ip_address) if event.get(@field_source_ip).nil? @output_queue.push(event) metric.increment(:events) end
udp_listener(output_queue)
click to toggle source
# File lib/logstash/inputs/udp.rb, line 108 def udp_listener(output_queue) @logger.info("Starting UDP listener", :address => "#{@host}:#{@port}") if @udp && !@udp.closed? @udp.close end if IPAddr.new(@host).ipv6? @udp = UDPSocket.new(Socket::AF_INET6) elsif IPAddr.new(@host).ipv4? @udp = UDPSocket.new(Socket::AF_INET) end # set socket receive buffer size if configured if @receive_buffer_bytes @udp.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, @receive_buffer_bytes) end rcvbuf = @udp.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i")[0] if @receive_buffer_bytes && rcvbuf != @receive_buffer_bytes @logger.warn("Unable to set receive_buffer_bytes to desired size. Requested #{@receive_buffer_bytes} but obtained #{rcvbuf} bytes.") end @udp.bind(@host, @port) @logger.info("UDP listener started", :address => "#{@host}:#{@port}", :receive_buffer_bytes => "#{rcvbuf}", :queue_size => "#{@queue_size}") while !stop? next if IO.select([@udp], [], [], 0.5).nil? # collect datagram messages and add to inputworker queue @queue_size.times do begin payload, client = @udp.recvfrom_nonblock(@buffer_size) break if payload.empty? push_data(payload, client) rescue IO::EAGAINWaitReadable break end end end ensure if @udp @udp.close_read rescue ignore_close_and_log($!) @udp.close_write rescue ignore_close_and_log($!) end end