class Piscina

Constants

AWS_ACCOUNT_NUM
AWS_REGION
DEFAULT_MESSAGE_TIMEOUT
DEFAULT_THREADS_IN_POOL
MESSAGE_RETRY_VISIBILITY
MESSAGE_VISIBILITY_TIMEOUT

Public Class Methods

new(sqs_url, klass, options={}) click to toggle source
# File lib/piscina/piscina.rb, line 14
def initialize(sqs_url, klass, options={})
  @queue = create_or_initialize_queue(sqs_url)
  @klass = klass

  pool_threads = options[:pool_threads] || DEFAULT_THREADS_IN_POOL
  @thread_pool = Executors.newFixedThreadPool(pool_threads)

  # Use SQS queue name for log name.
  queue_name = sqs_url.split("/")[-1]
  @logger = PiscinaLogger.new(queue_name)

  # Listen for SIGTERM's and other shutdown messages
  at_exit do
    shutdown_instance
  end

  self.poll
end

Public Instance Methods

poll() click to toggle source

Unfortunately, we can’t use the AWS SDK’s Queue#poll with a block as it uses Queue#call_message_block which will ALWAYS delete a received message.

# File lib/piscina/piscina.rb, line 35
def poll
  # Creates a real OS thread through JRuby
  Thread.new do
    poll_sqs
  end
end
poll_sqs() click to toggle source
# File lib/piscina/piscina.rb, line 42
def poll_sqs
  loop do
    msg = @queue.receive_messages(visibility_timeout: MESSAGE_VISIBILITY_TIMEOUT,
                                  wait_time_seconds: DEFAULT_MESSAGE_TIMEOUT)

    # receive_message can time out and return nil
    next  if msg.nil?
    break if @thread_pool.nil? || @thread_pool.isShutdown

    process_message(msg)
  end
end
process_message(msg) click to toggle source
# File lib/piscina/piscina.rb, line 55
def process_message(msg)
  @thread_pool.execute do
    begin
      @klass.perform(msg)

      body = msg.body.delete("\n").delete(" ")

      @logger.info("Successfully processed message:#{msg.id};body:#{body}")
    rescue => e
      @logger.error("Could not process message:#{msg.id};body:#{body};error:#{e.message}")

      # DLQ policy -> Messages are attempted N times and then benched. Policy is defined by the
      # queue itself.
      msg.visibility_timeout = MESSAGE_RETRY_VISIBILITY
    end
  end
end
send_message(text) click to toggle source
# File lib/piscina/piscina.rb, line 81
def send_message(text)
  @queue.send_message(text)
end
shutdown_instance() click to toggle source
# File lib/piscina/piscina.rb, line 73
def shutdown_instance
  # Shutting down thread pools does not happen immediately;
  # all jobs are allowed to finished before thread is closed.
  #TODO make sure to wait for shutdown
  @thread_pool.shutdown
  @logger.shutdown
end

Private Instance Methods

create_or_initialize_queue(sqs_url) click to toggle source
# File lib/piscina/piscina.rb, line 86
def create_or_initialize_queue(sqs_url)
  queue = AWS::SQS::Queue.new(sqs_url)

  # TODO should probably not create queues dynamically in prod
  # make this configurable
  unless queue.exists?
    # raise "Queue does not exist" unless queue.exists?
    queue_name = sqs_url.split("/")[-1]
    AWS::SQS.new.queues.create(queue_name)
  end

  queue
end