class Qless::Queue

A class for interacting with a specific queue. Not meant to be instantiated directly, it's accessed with Client#queues[...]

Constants

QueueNotEmptyError

Attributes

client[R]
name[R]

Public Class Methods

new(name, client) click to toggle source
# File lib/qless/queue.rb, line 42
def initialize(name, client)
  @client = client
  @name   = name
end

Public Instance Methods

==(other) click to toggle source
# File lib/qless/queue.rb, line 174
def ==(other)
  self.class == other.class &&
  client == other.client &&
  name.to_s == other.name.to_s
end
Also aliased as: eql?
counts() click to toggle source
# File lib/qless/queue.rb, line 56
def counts
  JSON.parse(@client.call('queues', @name))
end
eql?(other)
Alias for: ==
forget() click to toggle source
# File lib/qless/queue.rb, line 92
def forget
  job_count = length
  if job_count.zero?
    @client.call('queue.forget', name)
  else
    raise QueueNotEmptyError, "The queue is not empty. It has #{job_count} jobs."
  end
end
hash() click to toggle source
# File lib/qless/queue.rb, line 181
def hash
  self.class.hash ^ client.hash ^ name.to_s.hash
end
heartbeat() click to toggle source
# File lib/qless/queue.rb, line 60
def heartbeat
  get_config :heartbeat
end
heartbeat=(value) click to toggle source
# File lib/qless/queue.rb, line 64
def heartbeat=(value)
  set_config :heartbeat, value
end
inspect()
Alias for: to_s
jobs() click to toggle source
# File lib/qless/queue.rb, line 52
def jobs
  @jobs ||= QueueJobs.new(@name, @client)
end
length() click to toggle source

How many items in the queue?

# File lib/qless/queue.rb, line 161
def length
  (@client.redis.multi do
    %w[ locks work scheduled depends ].each do |suffix|
      @client.redis.zcard("ql:q:#{@name}-#{suffix}")
    end
  end).inject(0, :+)
end
max_concurrency() click to toggle source
# File lib/qless/queue.rb, line 68
def max_concurrency
  value = get_config('max-concurrency')
  value && Integer(value)
end
max_concurrency=(value) click to toggle source
# File lib/qless/queue.rb, line 73
def max_concurrency=(value)
  set_config 'max-concurrency', value
end
pause(opts = {}) click to toggle source
# File lib/qless/queue.rb, line 81
def pause(opts = {})
  @client.call('pause', name)
  @client.call('timeout', jobs.running(0, -1)) unless opts[:stopjobs].nil?
end
paused?() click to toggle source
# File lib/qless/queue.rb, line 77
def paused?
  counts['paused']
end
peek(count = nil) click to toggle source

Peek at a work item

# File lib/qless/queue.rb, line 150
def peek(count = nil)
  jids = JSON.parse(@client.call('peek', @name, (count || 1)))
  jobs = jids.map { |j| Job.new(@client, j) }
  count.nil? ? jobs[0] : jobs
end
pop(count = nil) click to toggle source

Pop a work item off the queue

# File lib/qless/queue.rb, line 143
def pop(count = nil)
  jids = JSON.parse(@client.call('pop', @name, worker_name, (count || 1)))
  jobs = jids.map { |j| Job.new(@client, j) }
  count.nil? ? jobs[0] : jobs
end
put(klass, data, opts = {}) click to toggle source

Put the described job in this queue Options include:

> priority (int)

> tags (array of strings)

> delay (int)

# File lib/qless/queue.rb, line 106
def put(klass, data, opts = {})
  opts = job_options(klass, data, opts)
  @client.call('put', worker_name, @name,
               (opts[:jid] || Qless.generate_jid),
               klass.is_a?(String) ? klass : klass.name,
               JSON.generate(data),
               opts.fetch(:delay, 0),
               'priority', opts.fetch(:priority, 0),
               'tags', JSON.generate(opts.fetch(:tags, [])),
               'retries', opts.fetch(:retries, 5),
               'depends', JSON.generate(opts.fetch(:depends, []))
  )
end
recur(klass, data, interval, opts = {}) click to toggle source

Make a recurring job in this queue Options include:

> priority (int)

> tags (array of strings)

> retries (int)

> offset (int)

# File lib/qless/queue.rb, line 126
def recur(klass, data, interval, opts = {})
  opts = job_options(klass, data, opts)
  @client.call(
    'recur',
    @name,
    (opts[:jid] || Qless.generate_jid),
    klass.is_a?(String) ? klass : klass.name,
    JSON.generate(data),
    'interval', interval, opts.fetch(:offset, 0),
    'priority', opts.fetch(:priority, 0),
    'tags', JSON.generate(opts.fetch(:tags, [])),
    'retries', opts.fetch(:retries, 5),
    'backlog', opts.fetch(:backlog, 0)
  )
end
stats(date = nil) click to toggle source
# File lib/qless/queue.rb, line 156
def stats(date = nil)
  JSON.parse(@client.call('stats', @name, (date || Time.now.to_f)))
end
to_s() click to toggle source
# File lib/qless/queue.rb, line 169
def to_s
  "#<Qless::Queue #{@name}>"
end
Also aliased as: inspect
unpause() click to toggle source
# File lib/qless/queue.rb, line 86
def unpause
  @client.call('unpause', name)
end
worker_name() click to toggle source

Our worker name is the same as our client's

# File lib/qless/queue.rb, line 48
def worker_name
  @client.worker_name
end

Private Instance Methods

get_config(config) click to toggle source
# File lib/qless/queue.rb, line 196
def get_config(config)
  @client.config["#{@name}-#{config}"]
end
job_options(klass, data, opts) click to toggle source
# File lib/qless/queue.rb, line 187
def job_options(klass, data, opts)
  return opts unless klass.respond_to?(:default_job_options)
  klass.default_job_options(data).merge(opts)
end
set_config(config, value) click to toggle source
# File lib/qless/queue.rb, line 192
def set_config(config, value)
  @client.config["#{@name}-#{config}"] = value
end