class LogStash::Outputs::SensorsAnalytics
An sensors_analytics output that does nothing.
Constants
- PLUGIN_VERSION
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 128 def close @buffer_items.each do |buffer_item| buffer_item.buffer_state[:timer].kill buffer_item.buffer_flush(:final => true) end @report_thread.kill @client.close report end
multi_receive(events)
click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 82 def multi_receive(events) return if events.empty? @receive_count += events.length events.each do |e| begin record = JSON.parse(e.get("message")) tag = concat_tag_from_hash_filed(e) if filebeat_input?(e) host = e.get("[host][name]") file = e.get("[log][file][path]") offset = e.get("[log][offset]") lib_detail = "#{host}###{file}" tag = host.to_s + file.to_s if tag.nil? collect_filebeat_status(lib_detail, offset) if @enable_filebeat_status_report else # 这里记录一个 file_input 的 lib_detail, 其他 input 为空 host = e.get("host") path = e.get("path") if !host.nil? && !path.nil? lib_detail = "#{host}###{path}" tag = host.to_s + path.to_s if tag.nil? else lib_detail = "" end end record["lib"] = { "$lib" => "Logstash", "$lib_version" => PLUGIN_VERSION, "$lib_method" => "tools", "$lib_detail" => lib_detail } record["project"] = @project if @project != nil buffer_item = @buffer_items[buffer_index(tag)] buffer_item.buffer_receive(record) rescue @logger.error("Could not process record", :record => e.to_s) @parse_error_count += 1 end end end
register()
click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 39 def register @logger.info("Registering sensors_analytics Output", :version => PLUGIN_VERSION, :url => @url, :flush_interval_sec => @flush_interval_sec, :flush_batch_size => @flush_batch_size, :hash_filed => @hash_filed, :enable_filebeat_status_report => @enable_filebeat_status_report ) http_client_config = client_config http_client_config[:user_agent] = "SensorsAnalytics Logstash Output Plugin " + PLUGIN_VERSION @client = Manticore::Client.new(http_client_config) @buffer_items = [] @receive_count = 0 @parse_error_count = 0 @last_report_time = Time.now @last_report_count = 0 @url.each_index do |i| option = { :flush_batch_size => @flush_batch_size, :flush_interval_sec => @flush_interval_sec, :client => @client, :url_list => @url, :index => i, :logger => @logger } buffer_item = BufferItem.new(option) @buffer_items << buffer_item end @recent_filebeat_status = {} if @enable_filebeat_status_report @report_thread = Thread.new do loop do sleep 60 report end end end
report()
click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 193 def report url_send_count_sum = {} @url.each do |url| url_send_count_sum[url] = 0 end @buffer_items.each do |buffer_item| buffer_url_send_count = buffer_item.url_send_count buffer_url_send_count.each do |url, count| url_send_count_sum[url] += count end end total_send_count = 0 url_send_count_sum.each do |url, count| total_send_count += count; end speed = (total_send_count - @last_report_count) / (Time.now - @last_report_time) @last_report_count = total_send_count @last_report_time = Time.now @logger.info("Report", :speed => speed.round(2), :receive_count => @receive_count, :send_count => total_send_count, :parse_error_count => @parse_error_count, :url_send_count => url_send_count_sum) @logger.info("Filebeat status Report: #{format_filebeat_report_and_clean}") if @enable_filebeat_status_report end
Private Instance Methods
buffer_index(tag)
click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 140 def buffer_index(tag) tag.hash % @url.length end
collect_filebeat_status(lib_detail, offset)
click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 169 def collect_filebeat_status(lib_detail, offset) status = @recent_filebeat_status[lib_detail] if status.nil? status = {:receive_time => Time.now, :offset => offset} @recent_filebeat_status[lib_detail] = status else status[:offset] = offset status[:receive_time] = Time.now end end
concat_tag_from_hash_filed(event)
click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 146 def concat_tag_from_hash_filed(event) if !@hash_filed.nil? && !@hash_filed.empty? tag = "" @hash_filed.each do |filed| tag << event.get(filed).to_s end return tag end nil end
filebeat_input?(event)
click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 159 def filebeat_input?(event) tag = event.get("[agent][type]") return true if !tag.nil? && tag == "filebeat" tag = event.get("[@metadata][beat]") return true if !tag.nil? && tag == "filebeat" false end
format_filebeat_report_and_clean()
click to toggle source
# File lib/logstash/outputs/sensors_analytics.rb, line 182 def format_filebeat_report_and_clean result = "\n" @recent_filebeat_status.each do |k, v| result << k << "=>" << v.to_s << "\n" end @recent_filebeat_status = {} result end