class LogStash::Outputs::Splunk
Constants
- RETRYABLE_MANTICORE_EXCEPTIONS
Attributes
is_batch[RW]
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 267 def close @timer.cancel client.close end
log_error_response(response, url, event)
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 143 def log_error_response(response, url, event) log_failure( "Encountered non-2xx HTTP code #{response.code}", :response_code => response.code, :url => url, :event => event ) end
log_retryable_response(response)
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 135 def log_retryable_response(response) if (response.code == 429) @logger.debug? && @logger.debug("Encountered a 429 response, will retry. This is not serious, just flow control via HTTP") else @logger.warn("Encountered a retryable HTTP request in HTTP output, will retry", :code => response.code, :body => response.body) end end
multi_receive(events)
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 117 def multi_receive(events) return if events.empty? send_events(events) end
register()
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 93 def register # We count outstanding requests with this queue # This queue tracks the requests to create backpressure # When this queue is empty no new requests may be sent, # tokens must be added back by the client on success @request_tokens = SizedQueue.new(@pool_max) @pool_max.times {|t| @request_tokens << true } @requests = Array.new @content_type = "application/json" @is_batch = @is_batch @is_raw = @is_raw @channel_identifier = @channel_identifier @headers["Content-Type"] = @content_type # Splunk HEC token @headers["Authorization"] = "Splunk " + @token if @channel_identifier @headers["X-Splunk-Request-Channel"] = @channel_identifier end # Run named Timer as daemon thread @timer = java.util.Timer.new("Splunk Output #{self.params['id']}", true) end
send_event(event, attempt)
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 220 def send_event(event, attempt) body = event_body(event) # Send the request url = @is_batch ? @url : event.sprintf(@url) headers = @is_batch ? @headers : event_headers(event) # Compress the body and add appropriate header if @http_compression == true headers["Content-Encoding"] = "gzip" body = gzip(body) end # Create an async request response = client.send(:post, url, :body => body, :headers => headers).call if !response_success?(response) if retryable_response?(response) log_retryable_response(response) return :retry, event, attempt else log_error_response(response, url, event) return :failure, event, attempt end else return :success, event, attempt end rescue => exception will_retry = retryable_exception?(exception) log_failure("Could not fetch URL", :url => url, :body => body, :headers => headers, :message => exception.message, :class => exception.class.name, :backtrace => exception.backtrace, :will_retry => will_retry ) if will_retry return :retry, event, attempt else return :failure, event, attempt end end
send_events(events)
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 152 def send_events(events) successes = java.util.concurrent.atomic.AtomicInteger.new(0) failures = java.util.concurrent.atomic.AtomicInteger.new(0) retries = java.util.concurrent.atomic.AtomicInteger.new(0) event_count = @is_batch ? 1 : events.size pending = Queue.new if @is_batch pending << [events, 0] else events.each {|e| pending << [e, 0]} end while popped = pending.pop break if popped == :done event, attempt = popped action, event, attempt = send_event(event, attempt) begin action = :failure if action == :retry && !@retry_failed case action when :success successes.incrementAndGet when :retry retries.incrementAndGet next_attempt = attempt+1 sleep_for = sleep_for_attempt(next_attempt) @logger.info("Retrying http request, will sleep for #{sleep_for} seconds") timer_task = RetryTimerTask.new(pending, event, next_attempt) @timer.schedule(timer_task, sleep_for*1000) when :failure failures.incrementAndGet else raise "Unknown action #{action}" end if action == :success || action == :failure if successes.get+failures.get == event_count pending << :done end end rescue => e # This should never happen unless there's a flat out bug in the code @logger.error("Error sending HTTP Request", :class => e.class.name, :message => e.message, :backtrace => e.backtrace) failures.incrementAndGet raise e end end rescue => e @logger.error("Error in http output loop", :class => e.class.name, :message => e.message, :backtrace => e.backtrace) raise e end
sleep_for_attempt(attempt)
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 214 def sleep_for_attempt(attempt) sleep_for = attempt**2 sleep_for = sleep_for <= 60 ? sleep_for : 60 (sleep_for/2) + (rand(0..sleep_for)/2) end
Private Instance Methods
convert_mapping(mapping, event)
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 313 def convert_mapping(mapping, event) if mapping.is_a?(Hash) mapping.reduce({}) do |acc, kv| k, v = kv acc[k] = convert_mapping(v, event) acc end elsif mapping.is_a?(Array) mapping.map { |elem| convert_mapping(elem, event) } else event.sprintf(mapping) end end
custom_headers(event)
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 339 def custom_headers(event) return nil unless @headers @headers.reduce({}) do |acc,kv| k,v = kv acc[k] = event.sprintf(v) acc end end
encode(hash)
click to toggle source
TODO Extract this to a codec
# File lib/logstash/outputs/splunk.rb, line 350 def encode(hash) return hash.collect do |key, value| CGI.escape(key) + "=" + CGI.escape(value.to_s) end.join("&") end
event_body(event)
click to toggle source
Format the HTTP body
# File lib/logstash/outputs/splunk.rb, line 294 def event_body(event) # TODO: Create an HTTP post data codec, use that here if @is_batch event.map {|e| LogStash::Json.dump(map_event(e)) }.join("\n") else LogStash::Json.dump(map_event(event)) end end
event_headers(event)
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 335 def event_headers(event) custom_headers(event) || {} end
gzip(data)
click to toggle source
gzip data
# File lib/logstash/outputs/splunk.rb, line 304 def gzip(data) gz = StringIO.new gz.set_encoding("BINARY") z = Zlib::GzipWriter.new(gz) z.write(data) z.close gz.string end
log_failure(message, opts)
click to toggle source
This is split into a separate method mostly to help testing
# File lib/logstash/outputs/splunk.rb, line 289 def log_failure(message, opts) @logger.error("[HTTP Output Failure] #{message}", opts) end
map_event(event)
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 327 def map_event(event) if @mapping convert_mapping(@mapping, event) else event.to_hash end end
response_success?(response)
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 274 def response_success?(response) code = response.code return true if @ignorable_codes && @ignorable_codes.include?(code) return code >= 200 && code <= 299 end
retryable_exception?(exception)
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 284 def retryable_exception?(exception) RETRYABLE_MANTICORE_EXCEPTIONS.any? {|me| exception.is_a?(me) } end
retryable_response?(response)
click to toggle source
# File lib/logstash/outputs/splunk.rb, line 280 def retryable_response?(response) @retryable_codes && @retryable_codes.include?(response.code) end