class LogStash::Outputs::Thinkingdata
An thinkingdata output that does nothing.
Constants
- PLUGIN_VERSION
Attributes
buffer_state[RW]
Public Instance Methods
close()
click to toggle source
def event
# File lib/logstash/outputs/thinkingdata.rb, line 132 def close buffer_state[:timer].kill buffer_flush(:final => true) @report_thread.kill @client.close report end
flush(events, final)
click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 142 def flush(events, final) events = data_valid(events) data = events.to_json if @compress == 0 compress_type = 'none' else gz = StringIO.new("w") gz.set_encoding("BINARY") z = Zlib::GzipWriter.new(gz) z.write(data) z.close data = gz.string compress_type = 'gzip' end if @appid.nil? || @appid.empty? headers = {'custom_appid' => 'true', 'version' => PLUGIN_VERSION, 'user-agent' => 'logstash_' + PLUGIN_VERSION, 'compress' => compress_type, 'TA-Integration-Type' => 'logstash', 'TA-Integration-Version' => PLUGIN_VERSION, 'TA-Integration-Count' => events.length.to_s} else headers = {'appid' => @appid, 'version' => PLUGIN_VERSION, 'user-agent' => 'logstash_' + PLUGIN_VERSION, 'compress' => compress_type, 'TA-Integration-Type' => 'logstash', 'TA-Integration-Version' => PLUGIN_VERSION, 'TA-Integration-Count' => events.length.to_s} end until do_send(data, headers) sleep 5 end @total_send_count += events.length end
multi_receive(events)
click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 104 def multi_receive(events) return if events.empty? @receive_count += events.length events.each do |event| begin content = JSON.parse(event.get("message")) content['#uuid'] = SecureRandom.uuid if @uuid if is_filebeat_input?(event) #filebeat input 记录 host = event.get("[host][name]") file = event.get("[log][file][path]") file = event.get("[source]") if file.nil? offset = event.get("[log][offset]") offset = event.get("[offset]") if offset.nil? log_detail = "host: #{host}, file: #{file}" record_filebeat_status(log_detail, offset) if @is_filebeat_status_record end buffer_receive(content) rescue => e @logger.error("Could not process content", :content => event.to_s, :Exception => e) @parse_error_count += 1 end end end
register()
click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 49 def register @logger.info("Registering thinkingdata Output", :url => @url, :appid => @appid, :flush_interval_sec => @flush_interval_sec, :flush_batch_size => @flush_batch_size, :compress => @compress, :uuid => @uuid, :is_filebeat_status_record => @is_filebeat_status_record, :appid_check => @appid_check ) http_client_config = client_config http_client_config[:user_agent] = "thinkingdata_logstash_output_plugin_" + PLUGIN_VERSION @client = Manticore::Client.new(http_client_config) ta_appid_check if @appid_check && !@appid.nil? && !@appid.empty? @receive_count = 0 @parse_error_count = 0 @last_report_count = 0 @total_send_count = 0 buffer_config = { :max_items => @flush_batch_size.to_i, :max_interval => @flush_interval_sec.to_i, :logger => @logger } buffer_initialize(buffer_config) @filebeat_status = {} if @is_filebeat_status_record @report_thread = Thread.new do loop do sleep 60 report end end end
Private Instance Methods
data_valid(events)
click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 239 def data_valid(events) result = [] events.each do |event| begin event.to_json result << event rescue => e @logger.error("Could not parse events", :content => event.to_s, :Exception => e) @parse_error_count += 1 end end return result end
do_send(data, headers)
click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 174 def do_send(data, headers) begin response = @client.post(@url, :body => data, :headers => headers).call if response.code != 200 @logger.error("Send failed, code: #{response.code}, body: #{response.body}", :url => @url) return false end rescue => e @logger.error("Send failed", :url => @url, :exception => e.class.name, :backtrace => e.backtrace) return false end true end
format_filebeat_report()
click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 228 def format_filebeat_report result = "\n" @filebeat_status.each do |k, v| result << k << "=>" << v.to_s << "\n" end @filebeat_status = {} result end
is_filebeat_input?(event)
click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 190 def is_filebeat_input?(event) type = event.get("[agent][type]") return true if !type.nil? && type == "filebeat" type = event.get("[@metadata][beat]") return true if !type.nil? && type == "filebeat" type = event.get("[beat]") return true unless type.nil? false end
record_filebeat_status(log_detail, offset)
click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 202 def record_filebeat_status(log_detail, offset) status = @filebeat_status[log_detail] if status.nil? status = {:receive_time => Time.now, :offset => offset} @filebeat_status[log_detail] = status else status[:offset] = offset status[:receive_time] = Time.now end end
report()
click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 215 def report interval_count = @total_send_count - @last_report_count @last_report_count = @total_send_count @logger.info("Report:", "IntervalReceive(records):" => interval_count, "Receive(records):" => @receive_count, "TotalSend(records):" => @total_send_count, "ParseError(records):" => @parse_error_count) @logger.info("Filebeat Status Report: #{format_filebeat_report}") if @is_filebeat_status_record && @filebeat_status != {} end
ta_appid_check()
click to toggle source
验证appid
# File lib/logstash/outputs/thinkingdata.rb, line 90 def ta_appid_check @server_uri = URI(@url) @server_uri.path = "/check_appid" @server_uri.query = "appid=" + @appid response = client.get(@server_uri.to_s).call result = JSON.parse(response.body) if result['code'] == -2 @logger.error("APPID 错误,请检查APPID。") raise end end