class Delayed::Backend::Sqs::Job

Public Class Methods

new(data = {}) click to toggle source
# File lib/delayed/backend/sqs.rb, line 20
def initialize(data = {})
  #puts "[init] Delayed::Backend::Sqs"
  @msg = nil

  if data.is_a?(AWS::SQS::ReceivedMessage)
    @msg = data
    data = JSON.load(data.body)
  end

  data.symbolize_keys!
  payload_obj = data.delete(:payload_object) || data.delete(:handler)

  @queue_name = data[:queue]      || Delayed::Worker.default_queue_name
  @delay      = data[:delay]      || Delayed::Worker.delay
  @timeout    = data[:timeout]    || Delayed::Worker.timeout
  @expires_in = data[:expires_in] || Delayed::Worker.expires_in
  @attributes = data
  self.payload_object = payload_obj
end

Public Instance Methods

destroy() click to toggle source
# File lib/delayed/backend/sqs.rb, line 75
def destroy
  if @msg
    #puts "job destroyed! #{@msg.id}"
    @msg.delete
  end
end
fail!() click to toggle source
# File lib/delayed/backend/sqs.rb, line 82
def fail!
  destroy
  # v2: move to separate queue
end
lock_exclusively!(*args) click to toggle source

No need to check locks

# File lib/delayed/backend/sqs.rb, line 94
def lock_exclusively!(*args)
  true
end
payload_object() click to toggle source
# File lib/delayed/backend/sqs.rb, line 40
def payload_object
  @payload_object ||= YAML.load(self.handler)
rescue TypeError, LoadError, NameError, ArgumentError => e
  raise DeserializationError,
    "Job failed to load: #{e.message}. Handler: #{handler.inspect}"
end
payload_object=(object) click to toggle source
# File lib/delayed/backend/sqs.rb, line 47
def payload_object=(object)
  if object.is_a? String
    @payload_object = YAML.load(object)
    self.handler = object
  else
    @payload_object = object
    self.handler = object.to_yaml
  end
end
reload(*args) click to toggle source
Calls superclass method
# File lib/delayed/backend/sqs.rb, line 103
def reload(*args)
  # reset
  super
end
save() click to toggle source
# File lib/delayed/backend/sqs.rb, line 57
def save
  #puts "[SAVE] #{@attributes.inspect}"

  if @attributes[:handler].blank?
    raise "Handler missing!"
  end
  payload = JSON.dump(@attributes)

  @msg.delete if @msg

  sqs.queues.named(queue_name).send_message(payload, :delay_seconds  => @delay)
  true
end
save!() click to toggle source
# File lib/delayed/backend/sqs.rb, line 71
def save!
  save
end
unlock(*args) click to toggle source

No need to check locks

# File lib/delayed/backend/sqs.rb, line 99
def unlock(*args)
  true
end
update_attributes(attributes) click to toggle source
# File lib/delayed/backend/sqs.rb, line 87
def update_attributes(attributes)
  attributes.symbolize_keys!
  @attributes.merge attributes
  save
end

Private Instance Methods

queue_name() click to toggle source
# File lib/delayed/backend/sqs.rb, line 110
def queue_name
  @queue_name
end
sqs() click to toggle source
# File lib/delayed/backend/sqs.rb, line 114
def sqs
  ::Delayed::Worker.sqs
end