class Qless::Queue
A class for interacting with a specific queue. Not meant to be instantiated directly, it's accessed with Client#queues[...]
Constants
- QueueNotEmptyError
Attributes
client[R]
name[R]
Public Class Methods
new(name, client)
click to toggle source
# File lib/qless/queue.rb, line 42 def initialize(name, client) @client = client @name = name end
Public Instance Methods
==(other)
click to toggle source
# File lib/qless/queue.rb, line 174 def ==(other) self.class == other.class && client == other.client && name.to_s == other.name.to_s end
Also aliased as: eql?
counts()
click to toggle source
# File lib/qless/queue.rb, line 56 def counts JSON.parse(@client.call('queues', @name)) end
forget()
click to toggle source
# File lib/qless/queue.rb, line 92 def forget job_count = length if job_count.zero? @client.call('queue.forget', name) else raise QueueNotEmptyError, "The queue is not empty. It has #{job_count} jobs." end end
hash()
click to toggle source
# File lib/qless/queue.rb, line 181 def hash self.class.hash ^ client.hash ^ name.to_s.hash end
heartbeat()
click to toggle source
# File lib/qless/queue.rb, line 60 def heartbeat get_config :heartbeat end
heartbeat=(value)
click to toggle source
# File lib/qless/queue.rb, line 64 def heartbeat=(value) set_config :heartbeat, value end
jobs()
click to toggle source
# File lib/qless/queue.rb, line 52 def jobs @jobs ||= QueueJobs.new(@name, @client) end
length()
click to toggle source
How many items in the queue?
# File lib/qless/queue.rb, line 161 def length (@client.redis.multi do %w[ locks work scheduled depends ].each do |suffix| @client.redis.zcard("ql:q:#{@name}-#{suffix}") end end).inject(0, :+) end
max_concurrency()
click to toggle source
# File lib/qless/queue.rb, line 68 def max_concurrency value = get_config('max-concurrency') value && Integer(value) end
max_concurrency=(value)
click to toggle source
# File lib/qless/queue.rb, line 73 def max_concurrency=(value) set_config 'max-concurrency', value end
pause(opts = {})
click to toggle source
# File lib/qless/queue.rb, line 81 def pause(opts = {}) @client.call('pause', name) @client.call('timeout', jobs.running(0, -1)) unless opts[:stopjobs].nil? end
paused?()
click to toggle source
# File lib/qless/queue.rb, line 77 def paused? counts['paused'] end
peek(count = nil)
click to toggle source
Peek at a work item
# File lib/qless/queue.rb, line 150 def peek(count = nil) jids = JSON.parse(@client.call('peek', @name, (count || 1))) jobs = jids.map { |j| Job.new(@client, j) } count.nil? ? jobs[0] : jobs end
pop(count = nil)
click to toggle source
Pop a work item off the queue
# File lib/qless/queue.rb, line 143 def pop(count = nil) jids = JSON.parse(@client.call('pop', @name, worker_name, (count || 1))) jobs = jids.map { |j| Job.new(@client, j) } count.nil? ? jobs[0] : jobs end
put(klass, data, opts = {})
click to toggle source
Put the described job in this queue Options include:
> priority (int)¶ ↑
> tags (array of strings)¶ ↑
> delay (int)¶ ↑
# File lib/qless/queue.rb, line 106 def put(klass, data, opts = {}) opts = job_options(klass, data, opts) @client.call('put', worker_name, @name, (opts[:jid] || Qless.generate_jid), klass.is_a?(String) ? klass : klass.name, JSON.generate(data), opts.fetch(:delay, 0), 'priority', opts.fetch(:priority, 0), 'tags', JSON.generate(opts.fetch(:tags, [])), 'retries', opts.fetch(:retries, 5), 'depends', JSON.generate(opts.fetch(:depends, [])) ) end
recur(klass, data, interval, opts = {})
click to toggle source
Make a recurring job in this queue Options include:
> priority (int)¶ ↑
> tags (array of strings)¶ ↑
> retries (int)¶ ↑
> offset (int)¶ ↑
# File lib/qless/queue.rb, line 126 def recur(klass, data, interval, opts = {}) opts = job_options(klass, data, opts) @client.call( 'recur', @name, (opts[:jid] || Qless.generate_jid), klass.is_a?(String) ? klass : klass.name, JSON.generate(data), 'interval', interval, opts.fetch(:offset, 0), 'priority', opts.fetch(:priority, 0), 'tags', JSON.generate(opts.fetch(:tags, [])), 'retries', opts.fetch(:retries, 5), 'backlog', opts.fetch(:backlog, 0) ) end
stats(date = nil)
click to toggle source
# File lib/qless/queue.rb, line 156 def stats(date = nil) JSON.parse(@client.call('stats', @name, (date || Time.now.to_f))) end
to_s()
click to toggle source
# File lib/qless/queue.rb, line 169 def to_s "#<Qless::Queue #{@name}>" end
Also aliased as: inspect
unpause()
click to toggle source
# File lib/qless/queue.rb, line 86 def unpause @client.call('unpause', name) end
worker_name()
click to toggle source
Our worker name is the same as our client's
# File lib/qless/queue.rb, line 48 def worker_name @client.worker_name end
Private Instance Methods
get_config(config)
click to toggle source
# File lib/qless/queue.rb, line 196 def get_config(config) @client.config["#{@name}-#{config}"] end
job_options(klass, data, opts)
click to toggle source
# File lib/qless/queue.rb, line 187 def job_options(klass, data, opts) return opts unless klass.respond_to?(:default_job_options) klass.default_job_options(data).merge(opts) end
set_config(config, value)
click to toggle source
# File lib/qless/queue.rb, line 192 def set_config(config, value) @client.config["#{@name}-#{config}"] = value end