class Piscina
Constants
- AWS_ACCOUNT_NUM
- AWS_REGION
- DEFAULT_MESSAGE_TIMEOUT
- DEFAULT_THREADS_IN_POOL
- MESSAGE_RETRY_VISIBILITY
- MESSAGE_VISIBILITY_TIMEOUT
Public Class Methods
new(sqs_url, klass, options={})
click to toggle source
# File lib/piscina/piscina.rb, line 14 def initialize(sqs_url, klass, options={}) @queue = create_or_initialize_queue(sqs_url) @klass = klass pool_threads = options[:pool_threads] || DEFAULT_THREADS_IN_POOL @thread_pool = Executors.newFixedThreadPool(pool_threads) # Use SQS queue name for log name. queue_name = sqs_url.split("/")[-1] @logger = PiscinaLogger.new(queue_name) # Listen for SIGTERM's and other shutdown messages at_exit do shutdown_instance end self.poll end
Public Instance Methods
poll()
click to toggle source
Unfortunately, we can’t use the AWS SDK’s Queue#poll with a block as it uses Queue#call_message_block which will ALWAYS delete a received message.
# File lib/piscina/piscina.rb, line 35 def poll # Creates a real OS thread through JRuby Thread.new do poll_sqs end end
poll_sqs()
click to toggle source
# File lib/piscina/piscina.rb, line 42 def poll_sqs loop do msg = @queue.receive_messages(visibility_timeout: MESSAGE_VISIBILITY_TIMEOUT, wait_time_seconds: DEFAULT_MESSAGE_TIMEOUT) # receive_message can time out and return nil next if msg.nil? break if @thread_pool.nil? || @thread_pool.isShutdown process_message(msg) end end
process_message(msg)
click to toggle source
# File lib/piscina/piscina.rb, line 55 def process_message(msg) @thread_pool.execute do begin @klass.perform(msg) body = msg.body.delete("\n").delete(" ") @logger.info("Successfully processed message:#{msg.id};body:#{body}") rescue => e @logger.error("Could not process message:#{msg.id};body:#{body};error:#{e.message}") # DLQ policy -> Messages are attempted N times and then benched. Policy is defined by the # queue itself. msg.visibility_timeout = MESSAGE_RETRY_VISIBILITY end end end
send_message(text)
click to toggle source
# File lib/piscina/piscina.rb, line 81 def send_message(text) @queue.send_message(text) end
shutdown_instance()
click to toggle source
# File lib/piscina/piscina.rb, line 73 def shutdown_instance # Shutting down thread pools does not happen immediately; # all jobs are allowed to finished before thread is closed. #TODO make sure to wait for shutdown @thread_pool.shutdown @logger.shutdown end
Private Instance Methods
create_or_initialize_queue(sqs_url)
click to toggle source
# File lib/piscina/piscina.rb, line 86 def create_or_initialize_queue(sqs_url) queue = AWS::SQS::Queue.new(sqs_url) # TODO should probably not create queues dynamically in prod # make this configurable unless queue.exists? # raise "Queue does not exist" unless queue.exists? queue_name = sqs_url.split("/")[-1] AWS::SQS.new.queues.create(queue_name) end queue end