class LogStash::Outputs::Thinkingdata

An thinkingdata output that does nothing.

Constants

PLUGIN_VERSION

Attributes

buffer_state[RW]

Public Instance Methods

close() click to toggle source

def event

# File lib/logstash/outputs/thinkingdata.rb, line 132
def close
  buffer_state[:timer].kill
  buffer_flush(:final => true)
  @report_thread.kill
  @client.close
  report
end
flush(events, final) click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 142
def flush(events, final)
  events = data_valid(events)
  data = events.to_json
  if @compress == 0
    compress_type = 'none'
  else
    gz = StringIO.new("w")
    gz.set_encoding("BINARY")
    z = Zlib::GzipWriter.new(gz)
    z.write(data)
    z.close
    data = gz.string
    compress_type = 'gzip'
  end
  if @appid.nil? || @appid.empty?
    headers = {'custom_appid' => 'true', 'version' => PLUGIN_VERSION, 'user-agent' => 'logstash_' + PLUGIN_VERSION,
               'compress' => compress_type, 'TA-Integration-Type' => 'logstash',
               'TA-Integration-Version' => PLUGIN_VERSION, 'TA-Integration-Count' => events.length.to_s}
  else
    headers = {'appid' => @appid, 'version' => PLUGIN_VERSION, 'user-agent' => 'logstash_' + PLUGIN_VERSION,
               'compress' => compress_type, 'TA-Integration-Type' => 'logstash',
               'TA-Integration-Version' => PLUGIN_VERSION, 'TA-Integration-Count' => events.length.to_s}
  end

  until do_send(data, headers)
    sleep 5
  end
  @total_send_count += events.length
end
multi_receive(events) click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 104
def multi_receive(events)
  return if events.empty?
  @receive_count += events.length
  events.each do |event|
    begin
      content = JSON.parse(event.get("message"))
      content['#uuid'] = SecureRandom.uuid if @uuid
      if is_filebeat_input?(event) #filebeat input 记录
        host = event.get("[host][name]")
        file = event.get("[log][file][path]")
        file = event.get("[source]") if file.nil?
        offset = event.get("[log][offset]")
        offset = event.get("[offset]") if offset.nil?
        log_detail = "host: #{host}, file: #{file}"
        record_filebeat_status(log_detail, offset) if @is_filebeat_status_record
      end
      buffer_receive(content)
    rescue => e
      @logger.error("Could not process content", :content => event.to_s, :Exception => e)
      @parse_error_count += 1
    end
  end
end
register() click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 49
def register
  @logger.info("Registering thinkingdata Output",
               :url => @url,
               :appid => @appid,
               :flush_interval_sec => @flush_interval_sec,
               :flush_batch_size => @flush_batch_size,
               :compress => @compress,
               :uuid => @uuid,
               :is_filebeat_status_record => @is_filebeat_status_record,
               :appid_check => @appid_check
  )

  http_client_config = client_config
  http_client_config[:user_agent] = "thinkingdata_logstash_output_plugin_" + PLUGIN_VERSION
  @client = Manticore::Client.new(http_client_config)
  ta_appid_check if @appid_check && !@appid.nil? && !@appid.empty?
  @receive_count = 0
  @parse_error_count = 0
  @last_report_count = 0
  @total_send_count = 0
  buffer_config = {
      :max_items => @flush_batch_size.to_i,
      :max_interval => @flush_interval_sec.to_i,
      :logger => @logger
  }
  buffer_initialize(buffer_config)
  @filebeat_status = {} if @is_filebeat_status_record
  @report_thread = Thread.new do
    loop do
      sleep 60
      report
    end
  end
end

Private Instance Methods

data_valid(events) click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 239
def data_valid(events)
  result = []
  events.each do |event|
    begin
      event.to_json
      result << event
    rescue => e
      @logger.error("Could not parse events", :content => event.to_s, :Exception => e)
      @parse_error_count += 1
    end
  end
  return result
end
do_send(data, headers) click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 174
def do_send(data, headers)
  begin
    response = @client.post(@url, :body => data, :headers => headers).call
    if response.code != 200
      @logger.error("Send failed, code: #{response.code}, body: #{response.body}", :url => @url)
      return false
    end
  rescue => e
    @logger.error("Send failed", :url => @url, :exception => e.class.name, :backtrace => e.backtrace)
    return false
  end
  true
end
format_filebeat_report() click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 228
def format_filebeat_report
  result = "\n"
  @filebeat_status.each do |k, v|
    result << k << "=>" << v.to_s << "\n"
  end
  @filebeat_status = {}
  result
end
is_filebeat_input?(event) click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 190
def is_filebeat_input?(event)
  type = event.get("[agent][type]")
  return true if !type.nil? && type == "filebeat"
  type = event.get("[@metadata][beat]")
  return true if !type.nil? && type == "filebeat"
  type = event.get("[beat]")
  return true unless type.nil?
  false
end
record_filebeat_status(log_detail, offset) click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 202
def record_filebeat_status(log_detail, offset)
  status = @filebeat_status[log_detail]
  if status.nil?
    status = {:receive_time => Time.now, :offset => offset}
    @filebeat_status[log_detail] = status
  else
    status[:offset] = offset
    status[:receive_time] = Time.now
  end
end
report() click to toggle source
# File lib/logstash/outputs/thinkingdata.rb, line 215
def report
  interval_count = @total_send_count - @last_report_count
  @last_report_count = @total_send_count
  @logger.info("Report:",
               "IntervalReceive(records):" => interval_count,
               "Receive(records):" => @receive_count,
               "TotalSend(records):" => @total_send_count,
               "ParseError(records):" => @parse_error_count)
  @logger.info("Filebeat Status Report: #{format_filebeat_report}") if @is_filebeat_status_record && @filebeat_status != {}
end
ta_appid_check() click to toggle source

验证appid

# File lib/logstash/outputs/thinkingdata.rb, line 90
def ta_appid_check
  @server_uri = URI(@url)
  @server_uri.path = "/check_appid"
  @server_uri.query = "appid=" + @appid
  response = client.get(@server_uri.to_s).call
  result = JSON.parse(response.body)
  if result['code'] == -2
    @logger.error("APPID 错误,请检查APPID。")
    raise
  end
end