class Nsque::Worker
Public Class Methods
new(options)
click to toggle source
# File lib/nsque/worker.rb, line 4 def initialize(options) raise ChannelRequiredError.new unless options.has_key?(:channel) @options = options end
Public Instance Methods
run()
click to toggle source
# File lib/nsque/worker.rb, line 9 def run initialize_traps consumer = Nsqrb::Consumer.new(@options) consumer.connect! loop do consumer.receive while message = consumer.messages.pop @processing_message = true begin hash = JSON.parse(message.content) puts hash.inspect enqueue_after = (hash['at'].to_f - Time.now.to_f) * 1000 if enqueue_after <= 0 klass = hash['class'].constantize klass.new.perform(hash['args']) else puts "Requeued: #{enqueue_after} ms" consumer.requeue(message, enqueue_after.to_i) next end rescue => e p e.message end consumer.confirm(message) @processing_message = false end break if @shutdown end ensure consumer.close! if consumer end
Private Instance Methods
initialize_traps()
click to toggle source
# File lib/nsque/worker.rb, line 45 def initialize_traps %w(INT TERM).each do |signal| trap(signal) do puts "I've recieved #{signal}!" @shutdown = true exit unless @processing_message end end end