class LogStash::Outputs::SensorsAnalytics

An sensors_analytics output that does nothing.

Constants

PLUGIN_VERSION

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 128
def close
  @buffer_items.each do |buffer_item|
    buffer_item.buffer_state[:timer].kill
    buffer_item.buffer_flush(:final => true)
  end
  @report_thread.kill
  @client.close
  report
end
multi_receive(events) click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 82
def multi_receive(events)
  return if events.empty?
  @receive_count += events.length
  events.each do |e|
    begin
      record = JSON.parse(e.get("message"))
      tag = concat_tag_from_hash_filed(e)
      if filebeat_input?(e)
        host = e.get("[host][name]")
        file = e.get("[log][file][path]")
        offset = e.get("[log][offset]")
        lib_detail = "#{host}###{file}"
        tag = host.to_s + file.to_s if tag.nil?
        collect_filebeat_status(lib_detail, offset) if @enable_filebeat_status_report
      else
        # 这里记录一个 file_input 的 lib_detail, 其他 input 为空
        host = e.get("host")
        path = e.get("path")
        if !host.nil? && !path.nil?
          lib_detail = "#{host}###{path}"
          tag = host.to_s + path.to_s if tag.nil?
        else
          lib_detail = ""
        end
      end

      record["lib"] = {
          "$lib" => "Logstash",
          "$lib_version" => PLUGIN_VERSION,
          "$lib_method" => "tools",
          "$lib_detail" => lib_detail
      }

      record["project"] = @project if @project != nil

      buffer_item = @buffer_items[buffer_index(tag)]
      buffer_item.buffer_receive(record)
    rescue
      @logger.error("Could not process record", :record => e.to_s)
      @parse_error_count += 1
    end
  end
end
register() click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 39
def register
  @logger.info("Registering sensors_analytics Output",
               :version => PLUGIN_VERSION,
               :url => @url,
               :flush_interval_sec => @flush_interval_sec,
               :flush_batch_size => @flush_batch_size,
               :hash_filed => @hash_filed,
               :enable_filebeat_status_report => @enable_filebeat_status_report
  )

  http_client_config = client_config
  http_client_config[:user_agent] = "SensorsAnalytics Logstash Output Plugin " + PLUGIN_VERSION
  @client = Manticore::Client.new(http_client_config)
  @buffer_items = []
  @receive_count = 0
  @parse_error_count = 0
  @last_report_time = Time.now
  @last_report_count = 0
  @url.each_index do |i|
    option = {
        :flush_batch_size => @flush_batch_size,
        :flush_interval_sec => @flush_interval_sec,
        :client => @client,
        :url_list => @url,
        :index => i,
        :logger => @logger
    }
    buffer_item = BufferItem.new(option)
    @buffer_items << buffer_item
  end

  @recent_filebeat_status = {} if @enable_filebeat_status_report
  @report_thread = Thread.new do
    loop do
      sleep 60
      report
    end
  end

end
report() click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 193
def report
  url_send_count_sum = {}
  @url.each do |url|
    url_send_count_sum[url] = 0
  end

  @buffer_items.each do |buffer_item|
    buffer_url_send_count = buffer_item.url_send_count
    buffer_url_send_count.each do |url, count|
      url_send_count_sum[url] += count
    end
  end

  total_send_count = 0
  url_send_count_sum.each do |url, count|
    total_send_count += count;
  end

  speed = (total_send_count - @last_report_count) / (Time.now - @last_report_time)
  @last_report_count = total_send_count
  @last_report_time = Time.now
  @logger.info("Report",
               :speed => speed.round(2),
               :receive_count => @receive_count,
               :send_count => total_send_count,
               :parse_error_count => @parse_error_count,
               :url_send_count => url_send_count_sum)
  @logger.info("Filebeat status Report: #{format_filebeat_report_and_clean}") if @enable_filebeat_status_report
end

Private Instance Methods

buffer_index(tag) click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 140
def buffer_index(tag)
  tag.hash % @url.length
end
collect_filebeat_status(lib_detail, offset) click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 169
def collect_filebeat_status(lib_detail, offset)
  status = @recent_filebeat_status[lib_detail]
  if status.nil?
    status = {:receive_time => Time.now, :offset => offset}
    @recent_filebeat_status[lib_detail] = status
  else
    status[:offset] = offset
    status[:receive_time] = Time.now
  end
end
concat_tag_from_hash_filed(event) click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 146
def concat_tag_from_hash_filed(event)
  if !@hash_filed.nil? && !@hash_filed.empty?
    tag = ""
    @hash_filed.each do |filed|
      tag << event.get(filed).to_s
    end
    return tag
  end
  nil
end
filebeat_input?(event) click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 159
def filebeat_input?(event)
  tag = event.get("[agent][type]")
  return true if !tag.nil? && tag == "filebeat"
  tag = event.get("[@metadata][beat]")
  return true if !tag.nil? && tag == "filebeat"
  false
end
format_filebeat_report_and_clean() click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 182
def format_filebeat_report_and_clean
  result = "\n"
  @recent_filebeat_status.each do |k, v|
    result << k << "=>" << v.to_s << "\n"
  end
  @recent_filebeat_status = {}
  result
end