class LogStash::Codecs::Sflow

The “sflow” codec is for decoding sflow v5 flows.

Public Class Methods

new(params = {}) click to toggle source
Calls superclass method
# File lib/logstash/codecs/sflow.rb, line 31
def initialize(params = {})
  super(params)
  @threadsafe = false
end

Public Instance Methods

assign_key_value(event, bindata_kv) click to toggle source

def initialize

# File lib/logstash/codecs/sflow.rb, line 38
def assign_key_value(event, bindata_kv)
  inspection_queue = [bindata_kv]

  while !inspection_queue.empty?
    kv = inspection_queue.shift()
    if kv.nil? || kv.to_s.eql?('')
      next
    end

    kv.each_pair do |k, v|
      if v.is_a?(BinData::Choice)
        inspection_queue.push(v)
      elsif !@removed_field.include?(k.to_s) && !v.is_a?(BinData::Array)
        event.set("#{k.to_s}", v.to_s)
      end
    end
  end
end
common_sflow(event, decoded, sample) click to toggle source

@param [Object] event @param [Object] decoded @param [Object] sample @param [Object] record

# File lib/logstash/codecs/sflow.rb, line 61
def common_sflow(event, decoded, sample)
  event.set('agent_ip', decoded['agent_ip'].to_s)
  assign_key_value(event, decoded)
  assign_key_value(event, sample)
end
decode(payload) { |event| ... } click to toggle source

def register

# File lib/logstash/codecs/sflow.rb, line 103
  def decode(payload)
    header = SFlowHeader.read(payload)
    unless @versions.include?(header.sflow_version)
      @logger.warn("Ignoring Sflow version v#{header.sflow_version}")
      return
    end

    decoded = SFlow.read(payload)

    events = []

    decoded['samples'].each do |sample|
      @logger.debug("sample: #{sample}")
      #Treat case with no flow decoded (Unknown flow)
      if sample['sample_data'].to_s.eql? ''
        @logger.warn("Unknown sample entreprise #{sample['sample_entreprise'].to_s} - format #{sample['sample_format'].to_s}")
        next
      end

      #treat sample flow and expanded sample flow
      if sample['sample_entreprise'] == 0 && (sample['sample_format'] == 1 || sample['sample_format'] == 3)
        # Create the logstash event
        event = LogStash::Event.new({})

        common_sflow(event, decoded, sample)

        sample['sample_data']['records'].each do |record|
          # Ensure that some data exist for the record
          if record['record_data'].to_s.eql? ''
            @logger.warn("Unknown sample_flow record: entreprise #{record['record_entreprise'].to_s}, format #{record['record_format'].to_s}")
            next
          end

          assign_key_value(event, record)

        end
        #compute frame_length_times_sampling_rate
        if event.include?('frame_length') and event.include?('sampling_rate')
          event.set('frame_length_times_sampling_rate', event.get('frame_length').to_i * event.get('sampling_rate').to_i)
        end

        if sample['sample_format'] == 1
          event.set('sflow_type', 'flow_sample')
        else
          event.set('sflow_type', 'expanded_flow_sample')
        end

        #Get interface dfescr if snmp_interface true
        snmp_call(event)

        events.push(event)

      #treat counter flow and expanded counter flow
      elsif sample['sample_entreprise'] == 0 && (sample['sample_format'] == 2 || sample['sample_format'] == 4)
        sample['sample_data']['records'].each do |record|
          # Ensure that some data exist for the record
          if record['record_data'].to_s.eql? ''
            @logger.warn("Unknown counter_flow record: entreprise #{record['record_entreprise'].to_s}, format #{record['record_format'].to_s}")
            next
          end

          # Create the logstash event
          event = LogStash::Event.new({})
          common_sflow(event, decoded, sample)

          assign_key_value(event, record)

          if sample['sample_format'] == 2
            event.set('sflow_type', 'counter_sample')
          else
            event.set('sflow_type', 'expanded_counter_sample')
          end


          #Get interface dfescr if snmp_interface true
          snmp_call(event)

          events.push(event)
        end
      end
    end

    events.each do |event|
      yield event
    end
  rescue BinData::ValidityError, EOFError, IOError, RangeError => e
    @logger.warn("Invalid sflow packet received (#{e})")
#    Sflow.instance_variables.each { |ivar| puts "#{ivar}: #{Sflow.instance_variable_get(ivar)}" }
  end
register() click to toggle source
# File lib/logstash/codecs/sflow.rb, line 87
 def register
   require 'logstash/codecs/sflow/datagram'
   require 'logstash/codecs/snmp/interface_resolver'

   # noinspection RubyResolve
   @removed_field = %w(record_length record_count record_entreprise record_format sample_entreprise sample_format
sample_length sample_count sample_header data storage) | @optional_removed_field

   if @snmp_interface
     @snmp = SNMPInterfaceResolver.new(@snmp_community, @interface_cache_size, @interface_cache_ttl, @logger)
   end
 end
snmp_call(event) click to toggle source
# File lib/logstash/codecs/sflow.rb, line 67
def snmp_call(event)
  if @snmp_interface
    if event.include?('source_id_type') and event.get('source_id_type').to_s == '0'
      if event.include?('source_id_index')
        event.set('source_id_index_descr', @snmp.get_interface(event.get('agent_ip'), event.get('source_id_index')))
      end
      if event.include?('input_interface')
        event.set('input_interface_descr', @snmp.get_interface(event.get('agent_ip'), event.get('input_interface')))
      end
      if event.include?('output_interface')
        event.set('output_interface_descr', @snmp.get_interface(event.get('agent_ip'), event.get('output_interface')))
      end
      if event.include?('interface_index')
        event.set('interface_index_descr', @snmp.get_interface(event.get('agent_ip'), event.get('interface_index')))
      end
    end
  end
end