class LogStash::Inputs::Syslog
Read syslog messages as events over the network.
This input is a good choice if you already use syslog today. It is also a good choice if you want to receive logs from appliances and network devices where you cannot run your own log collector.
Of course, ‘syslog’ is a very muddy term. This input only supports ‘RFC3164` syslog with some small modifications. The date format is allowed to be `RFC3164` style or `ISO8601`. Otherwise the rest of `RFC3164` must be obeyed. If you do not use `RFC3164`, do not use this input.
For more information see the www.ietf.org/rfc/rfc3164.txt[RFC3164 page].
Note: This input will start listeners on both TCP and UDP.
Public Class Methods
# File lib/logstash/inputs/syslog.rb, line 86 def initialize(*params) super @priority_key = ecs_select[disabled:'priority', v1:'[log][syslog][priority]'] @facility_key = ecs_select[disabled:'facility', v1:'[log][syslog][facility][code]'] @severity_key = ecs_select[disabled:'severity', v1:'[log][syslog][severity][code]'] @facility_label_key = ecs_select[disabled:'facility_label', v1:'[log][syslog][facility][name]'] @severity_label_key = ecs_select[disabled:'severity_label', v1:'[log][syslog][severity][name]'] @host_key = ecs_select[disabled:'host', v1:'[host][ip]'] @grok_pattern ||= ecs_select[ disabled:"<%{POSINT:#{@priority_key}}>%{SYSLOGLINE}", v1:"<%{POSINT:#{@priority_key}:int}>%{SYSLOGLINE}" ] @grok_filter = LogStash::Filters::Grok.new( "overwrite" => @syslog_field, "match" => { @syslog_field => @grok_pattern }, "tag_on_failure" => ["_grokparsefailure_sysloginput"], "ecs_compatibility" => ecs_compatibility # use ecs-compliant patterns ) @grok_filter_exec = ecs_select[ disabled: -> (event) { @grok_filter.filter(event) }, v1: -> (event) { event.set('[event][original]', event.get(@syslog_field)) @grok_filter.filter(event) set_service_fields(event) } ] @date_filter = LogStash::Filters::Date.new( "match" => [ "timestamp", "MMM dd HH:mm:ss", "MMM d HH:mm:ss", "MMM d HH:mm:ss", "ISO8601"], "locale" => @locale, "timezone" => @timezone, ) @date_filter_exec = ecs_select[ disabled: -> (event) { # in legacy (non-ecs) mode we used to match (SYSLOGBASE2) timestamp into two fields event.set("timestamp", event.get("timestamp8601")) if event.include?("timestamp8601") @date_filter.filter(event) }, v1: -> (event) { @date_filter.filter(event) event.remove('timestamp') } ] end
Public Instance Methods
# File lib/logstash/inputs/syslog.rb, line 138 def register @metric_errors = metric.namespace(:errors) @grok_filter.register @date_filter.register @tcp_sockets = Concurrent::Array.new @tcp = @udp = nil end
# File lib/logstash/inputs/syslog.rb, line 150 def run(output_queue) udp_thr = Thread.new(output_queue) do |output_queue| server(:udp, output_queue) end tcp_thr = Thread.new(output_queue) do |output_queue| server(:tcp, output_queue) end # If we exit and we're the only input, the agent will think no inputs # are running and initiate a shutdown. udp_thr.join tcp_thr.join end
@see LogStash::Plugin#close
# File lib/logstash/inputs/syslog.rb, line 298 def stop close_udp close_tcp end
Following RFC3164 where sane, we’ll try to parse a received message as if you were relaying a syslog message to it. If the message cannot be recognized (see @grok_filter), we’ll treat it like the whole event is correct and try to fill the missing pieces (host, priority, etc)
# File lib/logstash/inputs/syslog.rb, line 341 def syslog_relay(event) @grok_filter_exec.(event) if event.get("tags").nil? || !event.get("tags").include?(@grok_filter.tag_on_failure) # Per RFC3164, priority = (facility * 8) + severity # = (facility << 3) & (severity) priority = event.get(@priority_key).to_i rescue 13 set_priority event, priority @date_filter_exec.(event) else @logger.debug? && @logger.debug("un-matched syslog message", :message => event.get("message")) # RFC3164 says unknown messages get pri=13 set_priority event, 13 metric.increment(:unknown_messages) end # Apply severity and facility metadata if use_labels => true set_labels(event) if @use_labels end
Private Instance Methods
# File lib/logstash/inputs/syslog.rb, line 327 def close_tcp # If we somehow have this left open, close it. @tcp_sockets.each do |socket| socket.close rescue log_and_squash(:close_tcp_socket) end @tcp.close if @tcp rescue log_and_squash(:close_tcp) @tcp = nil end
# File lib/logstash/inputs/syslog.rb, line 304 def close_udp if @udp @udp.close_read rescue log_and_squash(:close_udp_read) @udp.close_write rescue log_and_squash(:close_udp_write) end @udp = nil end
# File lib/logstash/inputs/syslog.rb, line 283 def decode(ip, output_queue, data) @codec.decode(data) do |event| decorate(event) event.set(@host_key, ip) syslog_relay(event) output_queue << event metric.increment(:events) end rescue => e # swallow and log all decoding exceptions, these will never be socket related @logger.error("Error decoding data", :data => data.inspect, :exception => e.class, :message => e.message, :backtrace => e.backtrace) @metric_errors.increment(:decoding) end
Helper for inline rescues, which logs the exception at "DEBUG" level and returns nil. Instead of: ~~~ ruby
. foo rescue nil
~~~ Do: ~~~ ruby
. foo rescue log_and_squash
(:foo)
~~~
# File lib/logstash/inputs/syslog.rb, line 322 def log_and_squash(label) $! && logger.debug("#{label} failed:", :exception => $!.class, :message => $!.message) nil end
server call the specified protocol listener and basically restarts on any listener uncatched exception
@param protocol [Symbol] either :udp or :tcp @param output_queue [Queue] the pipeline input to filters queue
# File lib/logstash/inputs/syslog.rb, line 171 def server(protocol, output_queue) self.send("#{protocol}_listener", output_queue) rescue => e if !stop? @logger.warn("syslog listener died", :protocol => protocol, :address => "#{@host}:#{@port}", :exception => e, :backtrace => e.backtrace) @metric_errors.increment(:listener) Stud.stoppable_sleep(5) { stop? } retry end end
# File lib/logstash/inputs/syslog.rb, line 373 def set_labels(event) facility_number = event.get(@facility_key) severity_number = event.get(@severity_key) facility_label = @facility_labels[facility_number] event.set(@facility_label_key, facility_label) if facility_label severity_label = @severity_labels[severity_number] event.set(@severity_label_key, severity_label) if severity_label end
# File lib/logstash/inputs/syslog.rb, line 365 def set_priority(event, priority) severity = priority & 7 # 7 is 111 (3 bits) facility = priority >> 3 event.set(@priority_key, priority) event.set(@severity_key, severity) event.set(@facility_key, facility) end
# File lib/logstash/inputs/syslog.rb, line 384 def set_service_fields(event) service_type = @service_type if service_type && !service_type.empty? event.set('[service][type]', service_type) unless event.include?('[service][type]') end end
tcp_listener
accepts tcp connections and creates a new tcp_receiver
thread for each accepted socket. upon exception all tcp sockets will be closed and the exception bubbled in the server which will restart the listener.
# File lib/logstash/inputs/syslog.rb, line 206 def tcp_listener(output_queue) @logger.info("Starting syslog tcp listener", :address => "#{@host}:#{@port}") @tcp = TCPServer.new(@host, @port) @tcp.do_not_reverse_lookup = true while !stop? socket = @tcp.accept @tcp_sockets << socket metric.increment(:connections) Thread.new(output_queue, socket) do |output_queue, socket| tcp_receiver(output_queue, socket) end end ensure close_tcp end
# File lib/logstash/inputs/syslog.rb, line 224 def tcp_read_lines(socket) buffer = String.new loop do begin buffer << socket.read_nonblock(1024) while (newline = buffer.index("\n")) yield buffer.slice!(0..newline) end rescue IO::WaitReadable IO.select([socket], nil) retry end end end
tcp_receiver
is executed in a thread, any uncatched exception will be bubbled up to the tcp server thread and all tcp connections will be closed and the listener restarted.
# File lib/logstash/inputs/syslog.rb, line 241 def tcp_receiver(output_queue, socket) peer_addr = socket.peeraddr ip, port = peer_addr[3], peer_addr[1] @logger.info("new connection", :client => "#{ip}:#{port}") LogStash::Util::set_thread_name("input|syslog|tcp|#{ip}:#{port}}") first_read = true tcp_read_lines(socket) do |line| metric.increment(:messages_received) if @proxy_protocol && first_read first_read = false pp_info = line.split(/\s/) # PROXY proto clientip proxyip clientport proxyport if pp_info[0] != "PROXY" @logger.error("invalid proxy protocol header label", header: line) raise IOError else @logger.debug("proxy protocol detected", header: line) ip = pp_info[2] port = pp_info[3] next end end decode(ip, output_queue, line) end rescue Errno::ECONNRESET # swallow connection reset exceptions to avoid bubling up the tcp_listener & server logger.info("connection reset", :client => "#{ip}:#{port}") rescue Errno::EBADF, EOFError # swallow connection closed exceptions to avoid bubling up the tcp_listener & server logger.info("connection closed", :client => "#{ip}:#{port}") rescue IOError => e # swallow connection closed exceptions to avoid bubling up the tcp_listener & server raise(e) unless socket.closed? && e.message.to_s.include?("closed") logger.info("connection error:", :exception => e.class, :message => e.message) ensure @tcp_sockets.delete(socket) socket.close rescue log_and_squash(:close_tcp_receiver_socket) end
udp_listener
creates the udp socket and continously read from it. upon exception the socket will be closed and the exception bubbled in the server which will restart the listener
# File lib/logstash/inputs/syslog.rb, line 185 def udp_listener(output_queue) @logger.info("Starting syslog udp listener", :address => "#{@host}:#{@port}") @udp.close if @udp @udp = UDPSocket.new (IPAddr.new(@host).ipv6? rescue nil) ? Socket::AF_INET6 : Socket::AF_INET @udp.do_not_reverse_lookup = true @udp.bind(@host, @port) while !stop? payload, client = @udp.recvfrom(65507) metric.increment(:messages_received) decode(client[3], output_queue, payload) end ensure close_udp end