class Qless::Job
A Qless
job
Constants
- CantCompleteError
- CantFailError
- MiddlewareMisconfiguredError
Attributes
data[RW]
dependencies[R]
dependents[R]
expires_at[R]
failure[R]
jid[R]
klass_name[R]
original_retries[R]
priority[RW]
queue_name[R]
raw_queue_history[R]
retries_left[R]
spawned_from_jid[R]
state[R]
state_changed[R]
state_changed?[R]
tracked[R]
worker_name[R]
Public Class Methods
build(client, klass, attributes = {})
click to toggle source
# File lib/qless/job.rb, line 87 def self.build(client, klass, attributes = {}) defaults = { 'jid' => Qless.generate_jid, 'spawned_from_jid' => nil, 'data' => {}, 'klass' => klass.to_s, 'priority' => 0, 'tags' => [], 'worker' => 'mock_worker', 'expires' => Time.now + (60 * 60), # an hour from now 'state' => 'running', 'tracked' => false, 'queue' => 'mock_queue', 'retries' => 5, 'remaining' => 5, 'failure' => {}, 'history' => [], 'dependencies' => [], 'dependents' => [] } attributes = defaults.merge(Qless.stringify_hash_keys(attributes)) attributes['data'] = JSON.dump(attributes['data']) new(client, attributes) end
middlewares_on(job_klass)
click to toggle source
# File lib/qless/job.rb, line 112 def self.middlewares_on(job_klass) singleton_klass = job_klass.singleton_class singleton_klass.ancestors.select do |ancestor| ancestor != singleton_klass && ancestor.method_defined?(:around_perform) end end
new(client, atts)
click to toggle source
Calls superclass method
Qless::BaseJob::new
# File lib/qless/job.rb, line 119 def initialize(client, atts) super(client, atts.fetch('jid')) %w{jid data priority tags state tracked failure dependencies dependents spawned_from_jid}.each do |att| instance_variable_set(:"@#{att}", atts.fetch(att)) end # Parse the data string @data = JSON.parse(@data) @expires_at = atts.fetch('expires') @klass_name = atts.fetch('klass') @queue_name = atts.fetch('queue') @worker_name = atts.fetch('worker') @original_retries = atts.fetch('retries') @retries_left = atts.fetch('remaining') @raw_queue_history = atts.fetch('history') # This is a silly side-effect of Lua doing JSON parsing @tags = [] if @tags == {} @dependents = [] if @dependents == {} @dependencies = [] if @dependencies == {} @state_changed = false @before_callbacks = Hash.new { |h, k| h[k] = [] } @after_callbacks = Hash.new { |h, k| h[k] = [] } end
Public Instance Methods
[](key)
click to toggle source
# File lib/qless/job.rb, line 150 def [](key) @data[key] end
[]=(key, val)
click to toggle source
# File lib/qless/job.rb, line 154 def []=(key, val) @data[key] = val end
cancel()
click to toggle source
# File lib/qless/job.rb, line 295 def cancel note_state_change :cancel do @client.call('cancel', @jid) end end
complete(nxt = nil, options = {})
click to toggle source
Complete a job Options include
> next (String) the next queue¶ ↑
> delay (int) how long to delay it in the next queue¶ ↑
# File lib/qless/job.rb, line 279 def complete(nxt = nil, options = {}) note_state_change :complete do if nxt.nil? @client.call( 'complete', @jid, @worker_name, @queue_name, JSON.dump(@data)) else @client.call('complete', @jid, @worker_name, @queue_name, JSON.dump(@data), 'next', nxt, 'delay', options.fetch(:delay, 0), 'depends', JSON.dump(options.fetch(:depends, []))) end end rescue Qless::LuaScriptError => err raise CantCompleteError.new(err.message) end
depend(*jids)
click to toggle source
# File lib/qless/job.rb, line 331 def depend(*jids) !!@client.call('depends', @jid, 'on', *jids) end
description()
click to toggle source
# File lib/qless/job.rb, line 162 def description "#{@klass_name} (#{@jid} / #{@queue_name} / #{@state} / #{@data})" end
fail(group, message)
click to toggle source
Fail a job
# File lib/qless/job.rb, line 251 def fail(group, message) note_state_change :fail do @client.call( 'fail', @jid, @worker_name, group, message, JSON.dump(@data)) || false end rescue Qless::LuaScriptError => err raise CantFailError.new(err.message) end
heartbeat()
click to toggle source
Heartbeat a job
# File lib/qless/job.rb, line 265 def heartbeat @expires_at = @client.call( 'heartbeat', @jid, @worker_name, JSON.dump(@data)) end
history()
click to toggle source
# File lib/qless/job.rb, line 178 def history warn 'WARNING: Qless::Job#history is deprecated; use' + "Qless::Job#raw_queue_history instead; from:\n#{caller.first}" raw_queue_history end
initially_put_at()
click to toggle source
# File lib/qless/job.rb, line 198 def initially_put_at @initially_put_at ||= history_timestamp('put', :min) end
inspect()
click to toggle source
# File lib/qless/job.rb, line 166 def inspect "<Qless::Job #{description}>" end
log(message, data = nil)
click to toggle source
# File lib/qless/job.rb, line 343 def log(message, data = nil) if data @client.call('log', @jid, message, JSON.dump(data)) else @client.call('log', @jid, message) end end
note_state_change(event) { || ... }
click to toggle source
# File lib/qless/job.rb, line 363 def note_state_change(event) @before_callbacks[event].each { |blk| blk.call(self) } result = yield @state_changed = true @after_callbacks[event].each { |blk| blk.call(self) } result end
perform()
click to toggle source
# File lib/qless/job.rb, line 58 def perform # If we can't find the class, we should fail the job, not try to process begin klass rescue NameError return fail("#{queue_name}-NameError", "Cannot find #{klass_name}") end # log a real process executing job -- before we start processing log("started by pid:#{Process.pid}") middlewares = Job.middlewares_on(klass) if middlewares.last == SupportsMiddleware klass.around_perform(self) elsif middlewares.any? raise MiddlewareMisconfiguredError, 'The middleware chain for ' + "#{klass} (#{middlewares.inspect}) is misconfigured." + 'Qless::Job::SupportsMiddleware must be extended onto your job' + 'class first if you want to use any middleware.' elsif !klass.respond_to?(:perform) # If the klass doesn't have a :perform method, we should raise an error fail("#{queue_name}-method-missing", "#{klass_name} has no perform method") else klass.perform(self) end end
priority=(priority)
click to toggle source
# File lib/qless/job.rb, line 146 def priority=(priority) @priority = priority if @client.call('priority', @jid, priority) end
queue_history()
click to toggle source
# File lib/qless/job.rb, line 184 def queue_history @queue_history ||= @raw_queue_history.map do |history_event| history_event.each_with_object({}) do |(key, value), hash| # The only Numeric (Integer or Float) values we get in the history # are timestamps if value.is_a?(Numeric) hash[key] = Time.at(value).utc else hash[key] = value end end end end
reconnect_to_redis()
click to toggle source
# File lib/qless/job.rb, line 174 def reconnect_to_redis @client.redis.client.reconnect end
requeue(queue, opts = {})
click to toggle source
Move this from it's current queue into another
# File lib/qless/job.rb, line 229 def requeue(queue, opts = {}) queue_name = case queue when String, Symbol then queue else queue.name end note_state_change :requeue do @client.call('requeue', @client.worker_name, queue_name, @jid, @klass_name, JSON.dump(opts.fetch(:data, @data)), opts.fetch(:delay, 0), 'priority', opts.fetch(:priority, @priority), 'tags', JSON.dump(opts.fetch(:tags, @tags)), 'retries', opts.fetch(:retries, @original_retries), 'depends', JSON.dump(opts.fetch(:depends, @dependencies)) ) end end
Also aliased as: move
retry(delay = 0, group = nil, message = nil)
click to toggle source
# File lib/qless/job.rb, line 317 def retry(delay = 0, group = nil, message = nil) note_state_change :retry do if group.nil? results = @client.call( 'retry', @jid, @queue_name, @worker_name, delay) results.nil? ? false : results else results = @client.call( 'retry', @jid, @queue_name, @worker_name, delay, group, message) results.nil? ? false : results end end end
spawned_from()
click to toggle source
# File lib/qless/job.rb, line 202 def spawned_from @spawned_from ||= @client.jobs[@spawned_from_jid] end
tag(*tags)
click to toggle source
# File lib/qless/job.rb, line 309 def tag(*tags) @client.call('tag', 'add', @jid, *tags) end
timeout()
click to toggle source
# File lib/qless/job.rb, line 339 def timeout @client.call('timeout', @jid) end
to_hash()
click to toggle source
# File lib/qless/job.rb, line 206 def to_hash { jid: jid, spawned_from_jid: spawned_from_jid, expires_at: expires_at, state: state, queue_name: queue_name, history: raw_queue_history, worker_name: worker_name, failure: failure, klass_name: klass_name, tracked: tracked, dependencies: dependencies, dependents: dependents, original_retries: original_retries, retries_left: retries_left, data: data, priority: priority, tags: tags } end
to_s()
click to toggle source
# File lib/qless/job.rb, line 158 def to_s inspect end
track()
click to toggle source
# File lib/qless/job.rb, line 301 def track @client.call('track', 'track', @jid) end
ttl()
click to toggle source
# File lib/qless/job.rb, line 170 def ttl @expires_at - Time.now.to_f end
undepend(*jids)
click to toggle source
# File lib/qless/job.rb, line 335 def undepend(*jids) !!@client.call('depends', @jid, 'off', *jids) end
untag(*tags)
click to toggle source
# File lib/qless/job.rb, line 313 def untag(*tags) @client.call('tag', 'remove', @jid, *tags) end
untrack()
click to toggle source
# File lib/qless/job.rb, line 305 def untrack @client.call('track', 'untrack', @jid) end
Private Instance Methods
history_timestamp(name, selector)
click to toggle source
# File lib/qless/job.rb, line 373 def history_timestamp(name, selector) items = queue_history.select do |q| q['what'] == name end items.map do |q| q['when'] end.public_send(selector) end