class Kyu::Manager
Public Class Methods
new( worker_klass, queue_name, options={} )
click to toggle source
# File lib/kyu/manager.rb, line 3 def initialize( worker_klass, queue_name, options={} ) @max_retries = options.fetch( :max_retries, 3 ) @threadpool_size = options.fetch( :threadpool_size, 20 ) @logger = options.fetch( :logger, Logger.new( '/dev/null' ) ) @error_callback = options.fetch( :error_callback, ->( err ){} ) queue_options = options.fetch( :queue_options, {} ) @worker_klass = worker_klass sqs = AWS::SQS.new @dl_queue = sqs.queues.create( deadletter_queue_name_for( queue_name ), queue_options ) @queue = sqs.queues.create( queue_name, queue_options ) @queue.client.set_queue_attributes( queue_url: @queue.url, attributes: { "RedrivePolicy" => { "maxReceiveCount" => @max_retries, "deadLetterTargetArn" => @dl_queue.arn }.to_json } ) end
Public Instance Methods
start()
click to toggle source
# File lib/kyu/manager.rb, line 25 def start @logger.info( "Started listening for messages on: '#{@queue.arn}'" ) @logger.info( "Messages that could not be processes would be imgrated to: '#{@dl_queue.arn}'" ) EM.run do EM.threadpool_size = @threadpool_size stop = false Signal.trap( 'INT' ) { EM.stop; stop = true } Signal.trap( 'TERM' ) { EM.stop; stop = true } poll_message( @queue.visibility_timeout ) until stop end @logger.info( "Stopped listening for messages on: '#{@queue.arn}'" ) end
Private Instance Methods
deadletter_queue_name_for( queue_name )
click to toggle source
# File lib/kyu/manager.rb, line 70 def deadletter_queue_name_for( queue_name ) queue_name + '_DeadLetter' end
poll_message( visibility_timeout )
click to toggle source
# File lib/kyu/manager.rb, line 46 def poll_message( visibility_timeout ) msg = @queue.receive_message( attributes: [:receive_count] ) return unless msg EM.defer do begin @logger.info( "Started processing: '#{msg.body}'" ) Timeout::timeout( visibility_timeout ) do @worker_klass.new.process_message( JSON.parse( msg.body ) ) end msg.delete @logger.info( "Finished processing: '#{msg.body}'" ) rescue => err @logger.error( Kyu.stringify_exception( err ) ) @error_callback.call( err ) if msg.receive_count > @max_retries @logger.info( "Max number of reties exceeded for: '#{msg.body}'. Migrating the message to the dead-letter queue." ) @dl_queue.send_message( msg.body ) msg.delete end end end end