class Fluent::InsightOutput
Constants
- INSIGHT_DATA_TEMPLATE
- INSIGHT_LOGSETS_TEMPLATE
- INSIGHT_REST_TEMPLATE
- THREAD_COUNT
Public Instance Methods
client()
click to toggle source
# File lib/fluent/plugin/out_insight.rb, line 97 def client insight_data_host = INSIGHT_DATA_TEMPLATE % { :region => @region } @_socket ||= if @use_ssl context = OpenSSL::SSL::SSLContext.new socket = TCPSocket.new insight_data_host, @port ssl_client = OpenSSL::SSL::SSLSocket.new socket, context ssl_client.connect else if @protocol == 'tcp' TCPSocket.new insight_data_host, @port else udp_client = UDPSocket.new udp_client.connect insight_data_host, @port udp_client end end end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_insight.rb, line 31 def configure(conf) super end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_insight.rb, line 115 def format(tag, time, record) return [tag, time, record].to_msgpack end
insight_log_token(url)
click to toggle source
# File lib/fluent/plugin/out_insight.rb, line 82 def insight_log_token(url) log_body = insight_rest_request(url) if log_body.key?(@key) log_info = log_body[@key] if log_info.key?('tokens') log.info "Found log #{log_info['name']}" return log_info['name'], log_info['tokens'][0] else log.warn "Log is empty" end else log.warn "Response doesn't contain log info" end end
insight_rest_request(url)
click to toggle source
# File lib/fluent/plugin/out_insight.rb, line 68 def insight_rest_request(url) uri = URI(url) request = Net::HTTP::Get.new(uri) request['content-type'] = 'application/json' request['x-api-key'] = @api_key response = Net::HTTP.start(uri.hostname, uri.port, :use_ssl => uri.scheme == 'https') {|http| http.request(request) } if response.code == "200" return JSON.parse(response.body) end log.error "Request was failed HTTP #{response.code}: \n#{response.body}" end
send_insight(token, data)
click to toggle source
# File lib/fluent/plugin/out_insight.rb, line 143 def send_insight(token, data) retries = 0 begin client.write("#{token} #{data} \n") rescue Errno::EMSGSIZE str_length = data.length send_insight(token, data[0..str_length/2]) send_insight(token, data[(str_length/2) + 1..str_length]) log.warn "Message Too Long, re-sending it in two part..." rescue => e if retries < @max_retries retries += 1 @_socket = nil log.warn "Could not push logs to Insight, resetting connection and trying again. #{e.message}" sleep 5**retries retry end raise ConnectionFailure, "Could not push logs to Insight after #{retries} retries. #{e.message}" end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_insight.rb, line 64 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_insight.rb, line 35 def start logsets_url = (INSIGHT_REST_TEMPLATE + INSIGHT_LOGSETS_TEMPLATE) % { :region => @region, :logset_id => @logset_id } @insight_tags = Hash[@tags.split(",").each_with_object(nil).to_a] logset_body = insight_rest_request(logsets_url) threads = [] @tokens = Hash.new mutex = Mutex.new if logset_body.instance_of?(Hash) and logset_body.key?('logset') logset_info = logset_body['logset'] if logset_info.key?('logs_info') logs_info = logset_info['logs_info'] log_urls = logs_info.map { |log| log['links'][0]['href'] } THREAD_COUNT.times.map { Thread.new(log_urls, @tokens) do |urls, tokens| while url = mutex.synchronize { urls.pop } log_name, token = insight_log_token(url) mutex.synchronize { tokens[log_name] = token } end end }.each(&:join) else log.warn "No logs info found in logset response" end else log.warn "Logset emtity is empty" end super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_insight.rb, line 119 def write(chunk) return if @tokens.empty? chunk.msgpack_each do |(tag, time, record)| next unless record.is_a? Hash message = (record.delete('message')&.to_s&.rstrip || '') next if message.empty? @insight_tags.each { |k,v| @insight_tags[k] = record[k] } symbolized_tags = @insight_tags.inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo} if @tokens.key?(record[@key]) token = @tokens[record[@key]] prefix = @prefix % symbolized_tags send_insight(token, "#{prefix} #{message}") elsif @tokens.key?(@default) token = @tokens[@default] prefix = @prefix % symbolized_tags send_insight(token, "#{prefix} #{message}") else log.debug "No token found for #{record[@key]} and default log doesn't exist" end end end