class LogStash::Codecs::Sflow
The “sflow” codec is for decoding sflow v5 flows.
Public Class Methods
new(params = {})
click to toggle source
Calls superclass method
# File lib/logstash/codecs/sflow.rb, line 31 def initialize(params = {}) super(params) @threadsafe = false end
Public Instance Methods
assign_key_value(event, bindata_kv)
click to toggle source
def initialize
# File lib/logstash/codecs/sflow.rb, line 38 def assign_key_value(event, bindata_kv) inspection_queue = [bindata_kv] while !inspection_queue.empty? kv = inspection_queue.shift() if kv.nil? || kv.to_s.eql?('') next end kv.each_pair do |k, v| if v.is_a?(BinData::Choice) inspection_queue.push(v) elsif !@removed_field.include?(k.to_s) && !v.is_a?(BinData::Array) event.set("#{k.to_s}", v.to_s) end end end end
common_sflow(event, decoded, sample)
click to toggle source
@param [Object] event @param [Object] decoded @param [Object] sample @param [Object] record
# File lib/logstash/codecs/sflow.rb, line 61 def common_sflow(event, decoded, sample) event.set('agent_ip', decoded['agent_ip'].to_s) assign_key_value(event, decoded) assign_key_value(event, sample) end
decode(payload) { |event| ... }
click to toggle source
def register
# File lib/logstash/codecs/sflow.rb, line 103 def decode(payload) header = SFlowHeader.read(payload) unless @versions.include?(header.sflow_version) @logger.warn("Ignoring Sflow version v#{header.sflow_version}") return end decoded = SFlow.read(payload) events = [] decoded['samples'].each do |sample| @logger.debug("sample: #{sample}") #Treat case with no flow decoded (Unknown flow) if sample['sample_data'].to_s.eql? '' @logger.warn("Unknown sample entreprise #{sample['sample_entreprise'].to_s} - format #{sample['sample_format'].to_s}") next end #treat sample flow and expanded sample flow if sample['sample_entreprise'] == 0 && (sample['sample_format'] == 1 || sample['sample_format'] == 3) # Create the logstash event event = LogStash::Event.new({}) common_sflow(event, decoded, sample) sample['sample_data']['records'].each do |record| # Ensure that some data exist for the record if record['record_data'].to_s.eql? '' @logger.warn("Unknown sample_flow record: entreprise #{record['record_entreprise'].to_s}, format #{record['record_format'].to_s}") next end assign_key_value(event, record) end #compute frame_length_times_sampling_rate if event.include?('frame_length') and event.include?('sampling_rate') event.set('frame_length_times_sampling_rate', event.get('frame_length').to_i * event.get('sampling_rate').to_i) end if sample['sample_format'] == 1 event.set('sflow_type', 'flow_sample') else event.set('sflow_type', 'expanded_flow_sample') end #Get interface dfescr if snmp_interface true snmp_call(event) events.push(event) #treat counter flow and expanded counter flow elsif sample['sample_entreprise'] == 0 && (sample['sample_format'] == 2 || sample['sample_format'] == 4) sample['sample_data']['records'].each do |record| # Ensure that some data exist for the record if record['record_data'].to_s.eql? '' @logger.warn("Unknown counter_flow record: entreprise #{record['record_entreprise'].to_s}, format #{record['record_format'].to_s}") next end # Create the logstash event event = LogStash::Event.new({}) common_sflow(event, decoded, sample) assign_key_value(event, record) if sample['sample_format'] == 2 event.set('sflow_type', 'counter_sample') else event.set('sflow_type', 'expanded_counter_sample') end #Get interface dfescr if snmp_interface true snmp_call(event) events.push(event) end end end events.each do |event| yield event end rescue BinData::ValidityError, EOFError, IOError, RangeError => e @logger.warn("Invalid sflow packet received (#{e})") # Sflow.instance_variables.each { |ivar| puts "#{ivar}: #{Sflow.instance_variable_get(ivar)}" } end
register()
click to toggle source
# File lib/logstash/codecs/sflow.rb, line 87 def register require 'logstash/codecs/sflow/datagram' require 'logstash/codecs/snmp/interface_resolver' # noinspection RubyResolve @removed_field = %w(record_length record_count record_entreprise record_format sample_entreprise sample_format sample_length sample_count sample_header data storage) | @optional_removed_field if @snmp_interface @snmp = SNMPInterfaceResolver.new(@snmp_community, @interface_cache_size, @interface_cache_ttl, @logger) end end
snmp_call(event)
click to toggle source
# File lib/logstash/codecs/sflow.rb, line 67 def snmp_call(event) if @snmp_interface if event.include?('source_id_type') and event.get('source_id_type').to_s == '0' if event.include?('source_id_index') event.set('source_id_index_descr', @snmp.get_interface(event.get('agent_ip'), event.get('source_id_index'))) end if event.include?('input_interface') event.set('input_interface_descr', @snmp.get_interface(event.get('agent_ip'), event.get('input_interface'))) end if event.include?('output_interface') event.set('output_interface_descr', @snmp.get_interface(event.get('agent_ip'), event.get('output_interface'))) end if event.include?('interface_index') event.set('interface_index_descr', @snmp.get_interface(event.get('agent_ip'), event.get('interface_index'))) end end end end