class Ruote::Resque::Receiver

The receiver will poll the reply_queue in Resque, waiting for reply jobs. It does so in a new thread.

By default it polls the reply_queue every 5 seconds, but this is configurable via the ‘interval` configuration option. See {Ruote::Resque}.

You should launch the Receiver as soon as your engine is set up.

@example Running a ruote-resque Receiver

Ruote::Resque::Receiver.new(dashboard)

@example Overriding the handle_error method for custom exception handling

class Ruote::Resque::Receiver
  def handle_error(e)
    MyErrorHandler.handle(e)
  end
end

Ruote::Resque::Receiver.new(dashboard)

Public Class Methods

new(cwes, options = {}) click to toggle source

Retunrs a new Receiver instance and spawns a worker thread. @param [Ruote::Dashboard] cwes Accepts context, worker, engine or storage @param [Hash] options Passed on to Ruote, currently unused. @return [Receiver]

Calls superclass method
# File lib/ruote/resque/receiver.rb, line 41
def initialize(cwes, options = {})
  super
  @listener = listen
end

Public Instance Methods

handle_error(e) click to toggle source

Called when an error is raised during the poll/reserve/process flow of the Receiver. You should override this method for custom error handling. By default it just logs the exception. @param [Exception] e @return [void]

# File lib/ruote/resque/receiver.rb, line 57
def handle_error(e)
  Ruote::Resque.logger.error(e)
end
shutdown() click to toggle source

Stops the worker thread. @return [void]

# File lib/ruote/resque/receiver.rb, line 48
def shutdown
  @listener.kill
end

Private Instance Methods

flunk(workitem, class_name, message, backtrace) click to toggle source
Calls superclass method
# File lib/ruote/resque/receiver.rb, line 128
def flunk(workitem, class_name, message, backtrace)

  error = Ruote::ReceivedError.new(class_name, message, backtrace)
  args = [error, message, backtrace]

  super(workitem, *args)

end
listen() click to toggle source
# File lib/ruote/resque/receiver.rb, line 63
def listen

  Thread.new do
    loop do
      work
    end
  end

end
process(job) click to toggle source
# File lib/ruote/resque/receiver.rb, line 97
def process(job)

  job_arguments = job.args
  item = job_arguments.pop

  if job_arguments.any?
    flunk(item, *job_arguments)
  else
    receive(item)
  end

rescue => e
  # Fail the job on Resque, then raise to let handle_error do it's work
  job.fail(e)
  raise
end
reserve() click to toggle source
# File lib/ruote/resque/receiver.rb, line 84
def reserve

  if job = ::Resque.reserve(Ruote::Resque.configuration.reply_queue)
    validate_job(job)
    process(job)
  else
    sleep Ruote::Resque.configuration.interval
  end

rescue => e
  handle_error(e)
end
validate_job(job) click to toggle source
# File lib/ruote/resque/receiver.rb, line 114
def validate_job(job)

  job_class = job.payload_class.to_s
  unless job_class == 'Ruote::Resque::ReplyJob'
    raise InvalidJob.new(job_class)
  end

  item = job.args.last
  unless item.is_a?(Hash) && item['fields'] && item['fei']
    raise InvalidWorkitem.new(item.inspect)
  end

end
work() click to toggle source
# File lib/ruote/resque/receiver.rb, line 73
def work

  reserve

# handle_error may raise an exception itself
# in this case protect the thread
rescue => e
  Ruote::Resque.logger.error('*** UNCAUGHT EXCEPTION IN RUOTE::RESQUE::RECEIVER ***')
  Ruote::Resque.logger.error(e)
end