class SqsPoller

Constants

BACKOFF_FACTOR
BACKOFF_SLEEP_TIME

only needed in “run_with_backoff”:

DEFAULT_OPTIONS

queue poller options we want to set explicitly

EVENT_SOURCE

only needed in “preprocess”:

EVENT_TYPE
MAX_TIME_BEFORE_GIVING_UP

Public Class Methods

new(logger, stop_semaphore, poller_options = {}, client_options = {}, aws_options_hash) click to toggle source

initialization and setup happens once, outside the threads:

# File lib/logstash/inputs/sqs/poller.rb, line 38
def initialize(logger, stop_semaphore, poller_options = {}, client_options = {}, aws_options_hash)
  @logger = logger
  @stopped = stop_semaphore
  @queue = client_options[:sqs_queue]
  @from_sns = client_options[:from_sns]
  @max_processing_time = client_options[:max_processing_time]
  @sqs_delete_on_failure = client_options[:sqs_delete_on_failure]
  @options = DEFAULT_OPTIONS.merge(poller_options)
  begin
    @logger.info("Registering SQS input", :queue => @queue)
    sqs_client = Aws::SQS::Client.new(aws_options_hash)
    if uri?(@queue)
      queue_url = @queue
    else
      queue_url = sqs_client.get_queue_url({
        queue_name: @queue,
        queue_owner_aws_account_id: client_options[:queue_owner_aws_account_id]
      }).queue_url
    end

    @poller = Aws::SQS::QueuePoller.new(queue_url,
      :client => sqs_client
    )
    @logger.info("[#{Thread.current[:name]}] connected to queue.", :queue_url => queue_url)
  rescue Aws::SQS::Errors::ServiceError => e
    @logger.error("Cannot establish connection to Amazon SQS", :error => e)
    raise LogStash::ConfigurationError, "Verify the SQS queue name and your credentials"
  end
end

Public Instance Methods

run() { |record| ... } click to toggle source

this is called by every worker thread:

# File lib/logstash/inputs/sqs/poller.rb, line 69
def run() # not (&block) - pass explicitly (use yield below)
  # per-thread timer to extend visibility if necessary
  extender = nil
  message_backoff = (@options[:visibility_timeout] * 95).to_f / 100.0
  new_visibility = 2 * @options[:visibility_timeout]

  # "shutdown handler":
  @poller.before_request do |_|
    if stop?
      # kill visibility extender thread if active?
      extender.kill if extender
      extender = nil
      @logger.warn('issuing :stop_polling on "stop?" signal', :queue => @queue)
      # this can take up to "Receive Message Wait Time" (of the sqs queue) seconds to be recognized
      throw :stop_polling
    end
  end

  run_with_backoff do
    message_count = 0 #PROFILING
    @poller.poll(@options) do |message|
      message_count += 1 #PROFILING
      message_t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) #PROFILING
      # auto-increase the timeout if processing takes too long:
      poller_thread = Thread.current
      extender = Thread.new do
        while new_visibility < @max_processing_time do

          sleep message_backoff
          begin
            @poller.change_message_visibility_timeout(message, new_visibility)
            @logger.warn("[#{Thread.current[:name]}] Extended visibility for a long running message", :visibility => new_visibility) if new_visibility > 600.0
            new_visibility += message_backoff
          rescue Aws::SQS::Errors::InvalidParameterValue => e
            @logger.debug("Extending visibility failed for message", :error => e)
          else
            @logger.debug("[#{Thread.current[:name]}] Extended visibility for message", :visibility => new_visibility) #PROFILING
          end
        end
        @logger.error("[#{Thread.current[:name]}] Maximum visibility reached! We will delete this message from queue!")
        @poller.delete_message(message) if @sqs_delete_on_failure
        poller_thread.kill
      end
      extender[:name] = "#{Thread.current[:name]}/extender" #PROFILING
      failed = false
      record_count = 0
      begin
        message_completed = catch(:skip_delete) do
          preprocess(message) do |record|
            record_count += 1
            extender[:name] = "#{Thread.current[:name]}/extender/#{record[:key]}" #PROFILING
            yield(record)
          end
        end
      rescue Exception => e
        @logger.warn("Error in poller loop", :error => e)
        @logger.warn("Backtrace:\n\t#{e.backtrace.join("\n\t")}")
        failed = true
      end
      message_t1 = Process.clock_gettime(Process::CLOCK_MONOTONIC) #PROFILING
      unless message_completed
        @logger.debug("[#{Thread.current[:name]}] uncompleted message at the end of poller loop. We´ll throw skip_delete.", :message_count => message_count)
        extender.run if extender
      end
      # at this time the extender has either fired or is obsolete
      extender.kill if extender
      extender = nil
      throw :skip_delete if failed or ! message_completed
      #@logger.info("[#{Thread.current[:name]}] completed message.", :message => message_count)
    end
  end
end

Private Instance Methods

get_object_path(key) click to toggle source
# File lib/logstash/inputs/sqs/poller.rb, line 212
def get_object_path(key)
  folder = ::File.dirname(key)
  return '' if folder == '.'
  return folder
end
preprocess(message) { |{ bucket: bucket, key: key, size: size, folder: get_object_path(key) }| ... } click to toggle source

Customization to parse CrowdStrike Falcon Data Replicator queue messages

# File lib/logstash/inputs/sqs/poller.rb, line 149
  def preprocess(message)
    @logger.debug("Inside FDR Queue Preprocess: Start", :event => message)
    payload = JSON.parse(message.body)
    payload = JSON.parse(payload['Message']) if @from_sns
    @logger.debug("Payload in Preprocess: ", :payload => payload)

    # skip files other than aidmaster data  (for now)
    return nil unless payload['files'] and (payload['pathPrefix'].start_with?("data/") or payload['pathPrefix'].start_with?("aidmaster/"))
        
        bucket = payload['bucket']
    payload['files'].each do |file|
      @logger.debug("We found a file", :file => file)
      # in case there are any events with Records that aren't s3 object-created events and can't therefore be
      # processed by this plugin, we will skip them and remove them from queue
#      if record['eventSource'] == EVENT_SOURCE and record['eventName'].start_with?(EVENT_TYPE) then
        key  = file['path']
                    @logger.debug("We found a file", :key => key)
        size    = file['size']
                    @logger.debug("We found a file", :size => size)
                
        yield({
          bucket: bucket,
          key: key,
          size: size,
          folder: get_object_path(key)
        })
      #end
    end
  end
run_with_backoff(max_time = MAX_TIME_BEFORE_GIVING_UP, sleep_time = BACKOFF_SLEEP_TIME) { || ... } click to toggle source

Runs an AWS request inside a Ruby block with an exponential backoff in case we experience a ServiceError. @param [Integer] max_time maximum amount of time to sleep before giving up. @param [Integer] sleep_time the initial amount of time to sleep before retrying. instead of requiring @param [Block] block Ruby code block to execute and then doing a “block.call”, we yield to the passed block.

# File lib/logstash/inputs/sqs/poller.rb, line 189
def run_with_backoff(max_time = MAX_TIME_BEFORE_GIVING_UP, sleep_time = BACKOFF_SLEEP_TIME)
  next_sleep = sleep_time
  begin
    yield
    next_sleep = sleep_time
  rescue Aws::SQS::Errors::ServiceError => e
    @logger.warn("Aws::SQS::Errors::ServiceError ... retrying SQS request with exponential backoff", :queue => @queue, :sleep_time => sleep_time, :error => e)
    sleep(next_sleep)
    next_sleep = next_sleep > max_time ? sleep_time : sleep_time * BACKOFF_FACTOR
    retry
  end
end
stop?() click to toggle source
# File lib/logstash/inputs/sqs/poller.rb, line 144
def stop?
  @stopped.value
end
uri?(string) click to toggle source
# File lib/logstash/inputs/sqs/poller.rb, line 202
def uri?(string)
  uri = URI.parse(string)
  %w( http https ).include?(uri.scheme)
rescue URI::BadURIError
  false
rescue URI::InvalidURIError
  false
end