class Qless::Job

A Qless job

Constants

CantCompleteError
CantFailError
MiddlewareMisconfiguredError

Attributes

data[RW]
dependencies[R]
dependents[R]
expires_at[R]
failure[R]
jid[R]
klass_name[R]
original_retries[R]
priority[RW]
queue_name[R]
raw_queue_history[R]
retries_left[R]
spawned_from_jid[R]
state[R]
state_changed[R]
state_changed?[R]
tags[RW]
tracked[R]
worker_name[R]

Public Class Methods

build(client, klass, attributes = {}) click to toggle source
# File lib/qless/job.rb, line 87
def self.build(client, klass, attributes = {})
  defaults = {
    'jid'              => Qless.generate_jid,
    'spawned_from_jid' => nil,
    'data'             => {},
    'klass'            => klass.to_s,
    'priority'         => 0,
    'tags'             => [],
    'worker'           => 'mock_worker',
    'expires'          => Time.now + (60 * 60), # an hour from now
    'state'            => 'running',
    'tracked'          => false,
    'queue'            => 'mock_queue',
    'retries'          => 5,
    'remaining'        => 5,
    'failure'          => {},
    'history'          => [],
    'dependencies'     => [],
    'dependents'       => []
  }
  attributes = defaults.merge(Qless.stringify_hash_keys(attributes))
  attributes['data'] = JSON.dump(attributes['data'])
  new(client, attributes)
end
middlewares_on(job_klass) click to toggle source
# File lib/qless/job.rb, line 112
def self.middlewares_on(job_klass)
  singleton_klass = job_klass.singleton_class
  singleton_klass.ancestors.select do |ancestor|
    ancestor != singleton_klass && ancestor.method_defined?(:around_perform)
  end
end
new(client, atts) click to toggle source
Calls superclass method Qless::BaseJob::new
# File lib/qless/job.rb, line 119
def initialize(client, atts)
  super(client, atts.fetch('jid'))
  %w{jid data priority tags state tracked
     failure dependencies dependents spawned_from_jid}.each do |att|
    instance_variable_set(:"@#{att}", atts.fetch(att))
  end

  # Parse the data string
  @data = JSON.parse(@data)

  @expires_at        = atts.fetch('expires')
  @klass_name        = atts.fetch('klass')
  @queue_name        = atts.fetch('queue')
  @worker_name       = atts.fetch('worker')
  @original_retries  = atts.fetch('retries')
  @retries_left      = atts.fetch('remaining')
  @raw_queue_history = atts.fetch('history')

  # This is a silly side-effect of Lua doing JSON parsing
  @tags         = [] if @tags == {}
  @dependents   = [] if @dependents == {}
  @dependencies = [] if @dependencies == {}
  @state_changed = false
  @before_callbacks = Hash.new { |h, k| h[k] = [] }
  @after_callbacks  = Hash.new { |h, k| h[k] = [] }
end

Public Instance Methods

[](key) click to toggle source
# File lib/qless/job.rb, line 150
def [](key)
  @data[key]
end
[]=(key, val) click to toggle source
# File lib/qless/job.rb, line 154
def []=(key, val)
  @data[key] = val
end
cancel() click to toggle source
# File lib/qless/job.rb, line 295
def cancel
  note_state_change :cancel do
    @client.call('cancel', @jid)
  end
end
complete(nxt = nil, options = {}) click to toggle source

Complete a job Options include

> next (String) the next queue

> delay (int) how long to delay it in the next queue

# File lib/qless/job.rb, line 279
def complete(nxt = nil, options = {})
  note_state_change :complete do
    if nxt.nil?
      @client.call(
        'complete', @jid, @worker_name, @queue_name, JSON.dump(@data))
    else
      @client.call('complete', @jid, @worker_name, @queue_name,
                   JSON.dump(@data), 'next', nxt, 'delay',
                   options.fetch(:delay, 0), 'depends',
                   JSON.dump(options.fetch(:depends, [])))
    end
  end
rescue Qless::LuaScriptError => err
  raise CantCompleteError.new(err.message)
end
depend(*jids) click to toggle source
# File lib/qless/job.rb, line 331
def depend(*jids)
  !!@client.call('depends', @jid, 'on', *jids)
end
description() click to toggle source
# File lib/qless/job.rb, line 162
def description
  "#{@klass_name} (#{@jid} / #{@queue_name} / #{@state} / #{@data})"
end
fail(group, message) click to toggle source

Fail a job

# File lib/qless/job.rb, line 251
def fail(group, message)
  note_state_change :fail do
    @client.call(
      'fail',
      @jid,
      @worker_name,
      group, message,
      JSON.dump(@data)) || false
  end
rescue Qless::LuaScriptError => err
  raise CantFailError.new(err.message)
end
heartbeat() click to toggle source

Heartbeat a job

# File lib/qless/job.rb, line 265
def heartbeat
  @expires_at = @client.call(
    'heartbeat',
    @jid,
    @worker_name,
    JSON.dump(@data))
end
history() click to toggle source
# File lib/qless/job.rb, line 178
def history
  warn 'WARNING: Qless::Job#history is deprecated; use' +
       "Qless::Job#raw_queue_history instead; from:\n#{caller.first}"
  raw_queue_history
end
initially_put_at() click to toggle source
# File lib/qless/job.rb, line 198
def initially_put_at
  @initially_put_at ||= history_timestamp('put', :min)
end
inspect() click to toggle source
# File lib/qless/job.rb, line 166
def inspect
  "<Qless::Job #{description}>"
end
log(message, data = nil) click to toggle source
# File lib/qless/job.rb, line 343
def log(message, data = nil)
  if data
    @client.call('log', @jid, message, JSON.dump(data))
  else
    @client.call('log', @jid, message)
  end
end
move(queue, opts = {})
Alias for: requeue
note_state_change(event) { || ... } click to toggle source
# File lib/qless/job.rb, line 363
def note_state_change(event)
  @before_callbacks[event].each { |blk| blk.call(self) }
  result = yield
  @state_changed = true
  @after_callbacks[event].each { |blk| blk.call(self) }
  result
end
perform() click to toggle source
# File lib/qless/job.rb, line 58
def perform
  # If we can't find the class, we should fail the job, not try to process
  begin
    klass
  rescue NameError
    return fail("#{queue_name}-NameError", "Cannot find #{klass_name}")
  end

  # log a real process executing job -- before we start processing
  log("started by pid:#{Process.pid}")

  middlewares = Job.middlewares_on(klass)

  if middlewares.last == SupportsMiddleware
    klass.around_perform(self)
  elsif middlewares.any?
    raise MiddlewareMisconfiguredError, 'The middleware chain for ' +
          "#{klass} (#{middlewares.inspect}) is misconfigured." +
          'Qless::Job::SupportsMiddleware must be extended onto your job' +
          'class first if you want to use any middleware.'
  elsif !klass.respond_to?(:perform)
    # If the klass doesn't have a :perform method, we should raise an error
    fail("#{queue_name}-method-missing",
         "#{klass_name} has no perform method")
  else
    klass.perform(self)
  end
end
priority=(priority) click to toggle source
# File lib/qless/job.rb, line 146
def priority=(priority)
  @priority = priority if @client.call('priority', @jid, priority)
end
queue_history() click to toggle source
# File lib/qless/job.rb, line 184
def queue_history
  @queue_history ||= @raw_queue_history.map do |history_event|
    history_event.each_with_object({}) do |(key, value), hash|
      # The only Numeric (Integer or Float) values we get in the history
      # are timestamps
      if value.is_a?(Numeric)
        hash[key] = Time.at(value).utc
      else
        hash[key] = value
      end
    end
  end
end
reconnect_to_redis() click to toggle source
# File lib/qless/job.rb, line 174
def reconnect_to_redis
  @client.redis.client.reconnect
end
requeue(queue, opts = {}) click to toggle source

Move this from it's current queue into another

# File lib/qless/job.rb, line 229
def requeue(queue, opts = {})
  queue_name = case queue
                 when String, Symbol then queue
                 else queue.name
               end

  note_state_change :requeue do
    @client.call('requeue', @client.worker_name, queue_name, @jid, @klass_name,
                 JSON.dump(opts.fetch(:data, @data)),
                 opts.fetch(:delay, 0),
                 'priority', opts.fetch(:priority, @priority),
                 'tags', JSON.dump(opts.fetch(:tags, @tags)),
                 'retries', opts.fetch(:retries, @original_retries),
                 'depends', JSON.dump(opts.fetch(:depends, @dependencies))
    )
  end
end
Also aliased as: move
retry(delay = 0, group = nil, message = nil) click to toggle source
# File lib/qless/job.rb, line 317
def retry(delay = 0, group = nil, message = nil)
  note_state_change :retry do
    if group.nil?
      results = @client.call(
        'retry', @jid, @queue_name, @worker_name, delay)
      results.nil? ? false : results
    else
      results = @client.call(
        'retry', @jid, @queue_name, @worker_name, delay, group, message)
      results.nil? ? false : results
    end
  end
end
spawned_from() click to toggle source
# File lib/qless/job.rb, line 202
def spawned_from
  @spawned_from ||= @client.jobs[@spawned_from_jid]
end
tag(*tags) click to toggle source
# File lib/qless/job.rb, line 309
def tag(*tags)
  @client.call('tag', 'add', @jid, *tags)
end
timeout() click to toggle source
# File lib/qless/job.rb, line 339
def timeout
  @client.call('timeout', @jid)
end
to_hash() click to toggle source
# File lib/qless/job.rb, line 206
def to_hash
  {
    jid: jid,
    spawned_from_jid: spawned_from_jid,
    expires_at: expires_at,
    state: state,
    queue_name: queue_name,
    history: raw_queue_history,
    worker_name: worker_name,
    failure: failure,
    klass_name: klass_name,
    tracked: tracked,
    dependencies: dependencies,
    dependents: dependents,
    original_retries: original_retries,
    retries_left: retries_left,
    data: data,
    priority: priority,
    tags: tags
  }
end
to_s() click to toggle source
# File lib/qless/job.rb, line 158
def to_s
  inspect
end
track() click to toggle source
# File lib/qless/job.rb, line 301
def track
  @client.call('track', 'track', @jid)
end
ttl() click to toggle source
# File lib/qless/job.rb, line 170
def ttl
  @expires_at - Time.now.to_f
end
undepend(*jids) click to toggle source
# File lib/qless/job.rb, line 335
def undepend(*jids)
  !!@client.call('depends', @jid, 'off', *jids)
end
untag(*tags) click to toggle source
# File lib/qless/job.rb, line 313
def untag(*tags)
  @client.call('tag', 'remove', @jid, *tags)
end
untrack() click to toggle source
# File lib/qless/job.rb, line 305
def untrack
  @client.call('track', 'untrack', @jid)
end

Private Instance Methods

history_timestamp(name, selector) click to toggle source
# File lib/qless/job.rb, line 373
def history_timestamp(name, selector)
  items = queue_history.select do |q|
    q['what'] == name
  end
  items.map do |q|
    q['when']
  end.public_send(selector)
end