class Fluent::InsightOutput

Constants

INSIGHT_DATA_TEMPLATE
INSIGHT_LOGSETS_TEMPLATE
INSIGHT_REST_TEMPLATE
THREAD_COUNT

Public Instance Methods

client() click to toggle source
# File lib/fluent/plugin/out_insight.rb, line 97
def client
  insight_data_host = INSIGHT_DATA_TEMPLATE % { :region => @region }
  @_socket ||= if @use_ssl
    context    = OpenSSL::SSL::SSLContext.new
    socket     = TCPSocket.new insight_data_host, @port
    ssl_client = OpenSSL::SSL::SSLSocket.new socket, context
    ssl_client.connect
  else
    if @protocol == 'tcp'
      TCPSocket.new insight_data_host, @port
    else
      udp_client = UDPSocket.new
      udp_client.connect insight_data_host, @port
      udp_client
    end
  end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_insight.rb, line 31
def configure(conf)
  super
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_insight.rb, line 115
def format(tag, time, record)
  return [tag, time, record].to_msgpack
end
insight_log_token(url) click to toggle source
# File lib/fluent/plugin/out_insight.rb, line 82
def insight_log_token(url)
  log_body = insight_rest_request(url)
  if log_body.key?(@key)
    log_info = log_body[@key]
    if log_info.key?('tokens')
      log.info "Found log #{log_info['name']}"
      return log_info['name'], log_info['tokens'][0]
    else
      log.warn "Log is empty"
    end
  else
    log.warn "Response doesn't contain log info"
  end
end
insight_rest_request(url) click to toggle source
# File lib/fluent/plugin/out_insight.rb, line 68
def insight_rest_request(url)
  uri = URI(url)
  request = Net::HTTP::Get.new(uri)
  request['content-type'] = 'application/json'
  request['x-api-key'] = @api_key
  response = Net::HTTP.start(uri.hostname, uri.port, :use_ssl => uri.scheme == 'https') {|http|
    http.request(request)
  }
  if response.code == "200"
    return JSON.parse(response.body)
  end
  log.error "Request was failed HTTP #{response.code}: \n#{response.body}"
end
send_insight(token, data) click to toggle source
# File lib/fluent/plugin/out_insight.rb, line 143
def send_insight(token, data)
  retries = 0
  begin
    client.write("#{token} #{data} \n")
  rescue Errno::EMSGSIZE
    str_length = data.length
    send_insight(token, data[0..str_length/2])
    send_insight(token, data[(str_length/2) + 1..str_length])
    log.warn "Message Too Long, re-sending it in two part..."
  rescue => e
    if retries < @max_retries
      retries += 1
      @_socket = nil
      log.warn "Could not push logs to Insight, resetting connection and trying again. #{e.message}"
      sleep 5**retries
      retry
    end
    raise ConnectionFailure, "Could not push logs to Insight after #{retries} retries. #{e.message}"
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_insight.rb, line 64
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_insight.rb, line 35
def start
  logsets_url = (INSIGHT_REST_TEMPLATE + INSIGHT_LOGSETS_TEMPLATE) % { :region => @region, :logset_id => @logset_id }
  @insight_tags = Hash[@tags.split(",").each_with_object(nil).to_a]
  logset_body = insight_rest_request(logsets_url)
  threads = []
  @tokens = Hash.new
  mutex = Mutex.new
  if logset_body.instance_of?(Hash) and logset_body.key?('logset')
    logset_info = logset_body['logset']
    if logset_info.key?('logs_info')
      logs_info = logset_info['logs_info']
      log_urls = logs_info.map { |log| log['links'][0]['href'] }
      THREAD_COUNT.times.map {
        Thread.new(log_urls, @tokens) do |urls, tokens|
          while url = mutex.synchronize { urls.pop }
            log_name, token = insight_log_token(url)
            mutex.synchronize { tokens[log_name] = token }
          end
        end
      }.each(&:join)
    else
      log.warn "No logs info found in logset response"
    end
  else
    log.warn "Logset emtity is empty"
  end
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_insight.rb, line 119
def write(chunk)
  return if @tokens.empty?
  chunk.msgpack_each do |(tag, time, record)|
    next unless record.is_a? Hash
    message = (record.delete('message')&.to_s&.rstrip || '')
    next if message.empty?
    @insight_tags.each { |k,v|
      @insight_tags[k] = record[k]
    }
    symbolized_tags = @insight_tags.inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo}
    if @tokens.key?(record[@key])
      token = @tokens[record[@key]]
      prefix = @prefix % symbolized_tags
      send_insight(token,  "#{prefix} #{message}")
    elsif @tokens.key?(@default)
      token = @tokens[@default]
      prefix = @prefix % symbolized_tags
      send_insight(token,  "#{prefix} #{message}")
    else
      log.debug "No token found for #{record[@key]} and default log doesn't exist"
    end
  end
end