class LogStash::Outputs::Kinetica
Constants
- RETRYABLE_MANTICORE_EXCEPTIONS
- VALID_METHODS
Attributes
is_batch[RW]
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 273 def close @timer.cancel client.close end
log_error_response(response, url, event)
click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 147 def log_error_response(response, url, event) body_json=JSON.parse(response.body) response_message = body_json["message"] log_failure( "Encountered an error:", :response_code => response.code, :response_message => response_message, :url => url, :event => event ) end
log_retryable_response(response)
click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 139 def log_retryable_response(response) if (response.code == 429) @logger.debug? && @logger.debug("Encountered a 429 response, will retry. This is not serious, just flow control via HTTP") else @logger.warn("Encountered a retryable HTTP request in HTTP output, will retry", :code => response.code, :body => response.body) end end
multi_receive(events)
click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 121 def multi_receive(events) return if events.empty? send_events(events) end
register()
click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 88 def register #@http_method = @http_method.to_sym # We count outstanding requests with this queue # This queue tracks the requests to create backpressure # When this queue is empty no new requests may be sent, # tokens must be added back by the client on success @request_tokens = SizedQueue.new(@pool_max) @pool_max.times {|t| @request_tokens << true } @requests = Array.new if @content_type.nil? case @format #when "form" ; @content_type = "application/x-www-form-urlencoded" #when "json" ; @content_type = "application/json" #when "json_batch" ; @content_type = "application/json" when "csv" ; @content_type = "application/json" end end @is_batch = @format == "json_batch" @headers["Content-Type"] = @content_type # Run named Timer as daemon thread @timer = java.util.Timer.new("HTTP Output #{self.params['id']}", true) @text_delimiter = options["text_delimiter"].nil? ? "," : options["text_delimiter"] @text_escape_character = options["text_escape_character"].nil? ? "\\" : options["text_escape_character"] end
send_event(event, attempt)
click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 226 def send_event(event, attempt) body = event_body(event) # Send the request url = @is_batch ? @url : event.sprintf(@url) headers = @is_batch ? @headers : event_headers(event) # Compress the body and add appropriate header if @http_compression == true headers["Content-Encoding"] = "gzip" body = gzip(body) end # Create an async request puts(@logger.class) response = client.send(:post, url, :body => body, :headers => headers).call if !response_success?(response) if retryable_response?(response) log_retryable_response(response) return :retry, event, attempt else log_error_response(response, url, event) return :failure, event, attempt end else return :success, event, attempt end rescue => exception will_retry = retryable_exception?(exception) log_failure("Could not fetch URL", :url => url, :body => body, :headers => headers, :message => exception.message, :class => exception.class.name, :backtrace => exception.backtrace, :will_retry => will_retry ) if will_retry return :retry, event, attempt else return :failure, event, attempt end end
send_events(events)
click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 159 def send_events(events) successes = java.util.concurrent.atomic.AtomicInteger.new(0) failures = java.util.concurrent.atomic.AtomicInteger.new(0) retries = java.util.concurrent.atomic.AtomicInteger.new(0) event_count = @is_batch ? 1 : events.size pending = Queue.new if @is_batch pending << [events, 0] else events.each {|e| pending << [e, 0]} end while popped = pending.pop break if popped == :done event, attempt = popped action, event, attempt = send_event(event, attempt) begin action = :failure if action == :retry && !@retry_failed case action when :success successes.incrementAndGet when :retry retries.incrementAndGet next_attempt = attempt+1 sleep_for = sleep_for_attempt(next_attempt) @logger.info("Retrying http request, will sleep for #{sleep_for} seconds") timer_task = RetryTimerTask.new(pending, event, next_attempt) @timer.schedule(timer_task, sleep_for*1000) when :failure failures.incrementAndGet else raise "Unknown action #{action}" end if action == :success || action == :failure if successes.get+failures.get == event_count pending << :done end end rescue => e # This should never happen unless there's a flat out bug in the code @logger.error("Error sending HTTP Request", :class => e.class.name, :message => e.message, :backtrace => e.backtrace) failures.incrementAndGet raise e end end rescue => e @logger.error("Error in http output loop", :class => e.class.name, :message => e.message, :backtrace => e.backtrace) raise e end
sleep_for_attempt(attempt)
click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 220 def sleep_for_attempt(attempt) sleep_for = attempt**2 sleep_for = sleep_for <= 60 ? sleep_for : 60 (sleep_for/2) + (rand(0..sleep_for)/2) end
Private Instance Methods
custom_headers(event)
click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 372 def custom_headers(event) return nil unless @headers @headers.reduce({}) do |acc,kv| k,v = kv acc[k] = event.sprintf(v) acc end end
event_body(event)
click to toggle source
Format the HTTP body
# File lib/logstash/outputs/kinetica.rb, line 310 def event_body(event) @logger.info("event: #{event.to_s}") j=event.to_hash @logger.info("event: #{j}") first=true data="" j.keys.sort.each { |k| val=j[k].to_s.gsub("\"", "\\\"").gsub("\\","\\\\\\") if first data=val first = false else data="#{data}#{@text_delimiter}#{val}" end } body='{ "table_name": "'+@table_name+'", "data_text": "'+data+'", "create_table_options": '+create_table_options.to_json+', "options": '+options.to_json+' }' @logger.info("body: #{body}") body end
event_headers(event)
click to toggle source
def convert_mapping(mapping, event)
if mapping.is_a?(Hash) mapping.reduce({}) do |acc, kv| k, v = kv acc[k] = convert_mapping(v, event) acc end elsif mapping.is_a?(Array) mapping.map { |elem| convert_mapping(elem, event) } else event.sprintf(mapping) end
end
def map_event(event)
if @mapping convert_mapping(@mapping, event) else event.to_hash end
end
# File lib/logstash/outputs/kinetica.rb, line 368 def event_headers(event) custom_headers(event) || {} end
gzip(data)
click to toggle source
gzip data
# File lib/logstash/outputs/kinetica.rb, line 337 def gzip(data) gz = StringIO.new gz.set_encoding("BINARY") z = Zlib::GzipWriter.new(gz) z.write(data) z.close gz.string end
log_failure(message, opts)
click to toggle source
This is split into a separate method mostly to help testing
# File lib/logstash/outputs/kinetica.rb, line 305 def log_failure(message, opts) @logger.error("[HTTP Output Failure] #{message}", opts) end
response_success?(response)
click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 280 def response_success?(response) code = response.code message = response.message body = response.body body_json=JSON.parse(body) unless body_json["status"].nil? @logger.info("status: #{body_json["status"]}") if body_json["status"] == "ERROR" @logger.error("message: #{body_json["message"]}") return false end end return true if @ignorable_codes && @ignorable_codes.include?(code) return code >= 200 && code <= 299 end
retryable_exception?(exception)
click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 300 def retryable_exception?(exception) RETRYABLE_MANTICORE_EXCEPTIONS.any? {|me| exception.is_a?(me) } end
retryable_response?(response)
click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 296 def retryable_response?(response) @retryable_codes && @retryable_codes.include?(response.code) end