class LogStash::Outputs::Kinetica

Constants

RETRYABLE_MANTICORE_EXCEPTIONS
VALID_METHODS

Attributes

is_batch[RW]

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 273
def close
        @timer.cancel
        client.close
end
log_error_response(response, url, event) click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 147
def log_error_response(response, url, event)
        body_json=JSON.parse(response.body)
        response_message = body_json["message"]
        log_failure(
                                                "Encountered an error:",
                                                :response_code => response.code,
                                                :response_message => response_message,
                                                :url => url,
                                                :event => event
                                        )
end
log_retryable_response(response) click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 139
def log_retryable_response(response)
        if (response.code == 429)
                @logger.debug? && @logger.debug("Encountered a 429 response, will retry. This is not serious, just flow control via HTTP")
        else
                @logger.warn("Encountered a retryable HTTP request in HTTP output, will retry", :code => response.code, :body => response.body)
        end
end
multi_receive(events) click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 121
def multi_receive(events)
        return if events.empty?
        send_events(events)
end
register() click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 88
def register
        #@http_method = @http_method.to_sym

        # We count outstanding requests with this queue
        # This queue tracks the requests to create backpressure
        # When this queue is empty no new requests may be sent,
        # tokens must be added back by the client on success
        @request_tokens = SizedQueue.new(@pool_max)
        @pool_max.times {|t| @request_tokens << true }

        @requests = Array.new

        if @content_type.nil?
                case @format
                        #when "form" ; @content_type = "application/x-www-form-urlencoded"
                        #when "json" ; @content_type = "application/json"
                        #when "json_batch" ; @content_type = "application/json"
                        when "csv" ; @content_type = "application/json"
                end
        end

        @is_batch = @format == "json_batch"

        @headers["Content-Type"] = @content_type

        # Run named Timer as daemon thread
        @timer = java.util.Timer.new("HTTP Output #{self.params['id']}", true)
        
        @text_delimiter = options["text_delimiter"].nil? ? "," : options["text_delimiter"]
        
        @text_escape_character = options["text_escape_character"].nil? ? "\\" : options["text_escape_character"]
end
send_event(event, attempt) click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 226
def send_event(event, attempt)
        body = event_body(event)
        
        # Send the request
        url = @is_batch ? @url : event.sprintf(@url)
        headers = @is_batch ? @headers : event_headers(event)

        # Compress the body and add appropriate header
        if @http_compression == true
                headers["Content-Encoding"] = "gzip"
                body = gzip(body)
        end

        # Create an async request
        puts(@logger.class)
        response = client.send(:post, url, :body => body, :headers => headers).call
        if !response_success?(response)
                if retryable_response?(response)
                        log_retryable_response(response)
                        return :retry, event, attempt
                else
                        log_error_response(response, url, event)
                        return :failure, event, attempt
                end
        else
                return :success, event, attempt
        end

rescue => exception
        will_retry = retryable_exception?(exception)
        log_failure("Could not fetch URL",
                                                        :url => url,
                                                        :body => body,
                                                        :headers => headers,
                                                        :message => exception.message,
                                                        :class => exception.class.name,
                                                        :backtrace => exception.backtrace,
                                                        :will_retry => will_retry
        )

        if will_retry
                return :retry, event, attempt
        else
                return :failure, event, attempt
        end
end
send_events(events) click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 159
def send_events(events)
        successes = java.util.concurrent.atomic.AtomicInteger.new(0)
        failures      = java.util.concurrent.atomic.AtomicInteger.new(0)
        retries = java.util.concurrent.atomic.AtomicInteger.new(0)
        event_count = @is_batch ? 1 : events.size

        pending = Queue.new
        if @is_batch
                pending << [events, 0]
        else
                events.each {|e| pending << [e, 0]}
        end

        while popped = pending.pop
                break if popped == :done

                event, attempt = popped

                action, event, attempt = send_event(event, attempt)
                begin
                        action = :failure if action == :retry && !@retry_failed

                        case action
                        when :success
                                successes.incrementAndGet
                        when :retry
                                retries.incrementAndGet
                                next_attempt = attempt+1
                                sleep_for = sleep_for_attempt(next_attempt)
                                @logger.info("Retrying http request, will sleep for #{sleep_for} seconds")
                                timer_task = RetryTimerTask.new(pending, event, next_attempt)
                                @timer.schedule(timer_task, sleep_for*1000)
                        when :failure
                                failures.incrementAndGet
                        else
                                raise "Unknown action #{action}"
                        end

                        if action == :success || action == :failure
                                if successes.get+failures.get == event_count
                                        pending << :done
                                end
                        end
                rescue => e
                        # This should never happen unless there's a flat out bug in the code
                        @logger.error("Error sending HTTP Request",
                                :class => e.class.name,
                                :message => e.message,
                                :backtrace => e.backtrace)
                        failures.incrementAndGet
                        raise e
                end
        end
rescue => e
        @logger.error("Error in http output loop",
                                        :class => e.class.name,
                                        :message => e.message,
                                        :backtrace => e.backtrace)
        raise e
end
sleep_for_attempt(attempt) click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 220
def sleep_for_attempt(attempt)
        sleep_for = attempt**2
        sleep_for = sleep_for <= 60 ? sleep_for : 60
        (sleep_for/2) + (rand(0..sleep_for)/2)
end

Private Instance Methods

custom_headers(event) click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 372
def custom_headers(event)
        return nil unless @headers

        @headers.reduce({}) do |acc,kv|
                k,v = kv
                acc[k] = event.sprintf(v)
                acc
        end
end
event_body(event) click to toggle source

Format the HTTP body

# File lib/logstash/outputs/kinetica.rb, line 310
        def event_body(event)
                @logger.info("event: #{event.to_s}")
                j=event.to_hash
                @logger.info("event: #{j}")
                first=true
                data=""
                j.keys.sort.each {
                        |k|
                        val=j[k].to_s.gsub("\"", "\\\"").gsub("\\","\\\\\\")
                        if first
                                data=val
                                first = false
                        else
                                data="#{data}#{@text_delimiter}#{val}"
                        end
                }
                body='{
"table_name": "'+@table_name+'",
"data_text": "'+data+'",
"create_table_options": '+create_table_options.to_json+',
"options": '+options.to_json+'
}'
                @logger.info("body: #{body}")
                body
        end
event_headers(event) click to toggle source

def convert_mapping(mapping, event)

if mapping.is_a?(Hash)
        mapping.reduce({}) do |acc, kv|
                k, v = kv
                acc[k] = convert_mapping(v, event)
                acc
        end
elsif mapping.is_a?(Array)
        mapping.map { |elem| convert_mapping(elem, event) }
else
        event.sprintf(mapping)
end

end

def map_event(event)

if @mapping
        convert_mapping(@mapping, event)
else
        event.to_hash
end

end

# File lib/logstash/outputs/kinetica.rb, line 368
def event_headers(event)
        custom_headers(event) || {}
end
gzip(data) click to toggle source

gzip data

# File lib/logstash/outputs/kinetica.rb, line 337
def gzip(data)
        gz = StringIO.new
        gz.set_encoding("BINARY")
        z = Zlib::GzipWriter.new(gz)
        z.write(data)
        z.close
        gz.string
end
log_failure(message, opts) click to toggle source

This is split into a separate method mostly to help testing

# File lib/logstash/outputs/kinetica.rb, line 305
def log_failure(message, opts)
        @logger.error("[HTTP Output Failure] #{message}", opts)
end
response_success?(response) click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 280
def response_success?(response)
        code = response.code
        message = response.message
        body = response.body
        body_json=JSON.parse(body)
        unless body_json["status"].nil?
                @logger.info("status: #{body_json["status"]}")
                if body_json["status"] == "ERROR"
                        @logger.error("message: #{body_json["message"]}")
                        return false 
                end
        end
        return true if @ignorable_codes && @ignorable_codes.include?(code)
        return code >= 200 && code <= 299
end
retryable_exception?(exception) click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 300
def retryable_exception?(exception)
        RETRYABLE_MANTICORE_EXCEPTIONS.any? {|me| exception.is_a?(me) }
end
retryable_response?(response) click to toggle source
# File lib/logstash/outputs/kinetica.rb, line 296
def retryable_response?(response)
        @retryable_codes && @retryable_codes.include?(response.code)
end