module Resque
Constants
- DEFAULT_HEARTBEAT_INTERVAL
- DEFAULT_PRUNE_INTERVAL
- Version
Attributes
By default, jobs are pushed to the back of the queue and popped from the front, resulting in “first in, first out” (FIFO) execution order. Set to true to push jobs to the front of the queue instead, resulting in “last in, first out” (LIFO) execution order.
Defines how often a Resque
worker updates the heartbeat key. Must be less than the prune interval.
Set or retrieve the current logger object
Defines how often Resque
checks for dead workers.
Public Instance Methods
The `after_fork` hook will be run in the child process and is passed the current job. Any changes you make, therefore, will only live as long as the job currently being processed.
Call with a block to register a hook. Call with no arguments to return all registered hooks.
# File lib/resque.rb, line 261 def after_fork(&block) block ? register_hook(:after_fork, block) : hooks(:after_fork) end
Register an after_fork
proc.
# File lib/resque.rb, line 266 def after_fork=(block) register_hook(:after_fork, block) end
The `after_pause` hook will be run in the parent process after the worker has paused (via SIGCONT).
# File lib/resque.rb, line 283 def after_pause(&block) block ? register_hook(:after_pause, block) : hooks(:after_pause) end
Register an after_pause
proc.
# File lib/resque.rb, line 288 def after_pause=(block) register_hook(:after_pause, block) end
The `before_first_fork` hook will be run in the parent process only once, before forking to run the first job. Be careful- any changes you make will be permanent for the lifespan of the worker.
Call with a block to register a hook. Call with no arguments to return all registered hooks.
# File lib/resque.rb, line 231 def before_first_fork(&block) block ? register_hook(:before_first_fork, block) : hooks(:before_first_fork) end
Register a before_first_fork
proc.
# File lib/resque.rb, line 236 def before_first_fork=(block) register_hook(:before_first_fork, block) end
The `before_fork` hook will be run in the parent process before every job, so be careful- any changes you make will be permanent for the lifespan of the worker.
Call with a block to register a hook. Call with no arguments to return all registered hooks.
# File lib/resque.rb, line 246 def before_fork(&block) block ? register_hook(:before_fork, block) : hooks(:before_fork) end
Register a before_fork
proc.
# File lib/resque.rb, line 251 def before_fork=(block) register_hook(:before_fork, block) end
The `before_pause` hook will be run in the parent process before the worker has paused processing (via pause_processing or SIGUSR2).
# File lib/resque.rb, line 272 def before_pause(&block) block ? register_hook(:before_pause, block) : hooks(:before_pause) end
Register a before_pause
proc.
# File lib/resque.rb, line 277 def before_pause=(block) register_hook(:before_pause, block) end
Given a word with dashes, returns a camel cased version of it.
classify('job-name') # => 'JobName'
# File lib/resque.rb, line 60 def classify(dashed_word) dashed_word.split('-').map(&:capitalize).join end
Tries to find a constant with the name specified in the argument string:
constantize(“Module”) # => Module constantize(“Test::Unit”) # => Test::Unit
The name is assumed to be the one of a top-level constant, no matter whether it starts with “::” or not. No lexical context is taken into account:
C = 'outside' module M
C = 'inside' C # => 'inside' constantize("C") # => 'outside', same as ::C
end
NameError is raised when the constant is unknown.
# File lib/resque.rb, line 81 def constantize(camel_cased_word) camel_cased_word = camel_cased_word.to_s if camel_cased_word.include?('-') camel_cased_word = classify(camel_cased_word) end names = camel_cased_word.split('::') names.shift if names.empty? || names.first.empty? constant = Object names.each do |name| args = Module.method(:const_get).arity != 1 ? [false] : [] if constant.const_defined?(name, *args) constant = constant.const_get(name) else constant = constant.const_missing(name) end end constant end
Given a string, returns a Ruby object.
# File lib/resque.rb, line 43 def decode(object) return unless object begin if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) MultiJson.load object else MultiJson.decode object end rescue ::MultiJson::DecodeError => e raise Helpers::DecodeException, e.message, e.backtrace end end
This method can be used to conveniently remove a job from a queue. It assumes the class you're passing it is a real Ruby class (not a string or reference) which either:
a) has a @queue ivar set b) responds to `queue`
If either of those conditions are met, it will use the value obtained from performing one of the above operations to determine the queue.
If no queue can be inferred this method will raise a `Resque::NoQueueError`
If no args are given, this method will dequeue all jobs matching the provided class. See `Resque::Job.destroy` for more information.
Returns the number of jobs destroyed.
Example:
# Removes all jobs of class `UpdateNetworkGraph` Resque.dequeue(GitHub::Jobs::UpdateNetworkGraph) # Removes all jobs of class `UpdateNetworkGraph` with matching args. Resque.dequeue(GitHub::Jobs::UpdateNetworkGraph, 'repo:135325')
This method is considered part of the `stable` API.
# File lib/resque.rb, line 488 def dequeue(klass, *args) # Perform before_dequeue hooks. Don't perform dequeue if any hook returns false before_hooks = Plugin.before_dequeue_hooks(klass).collect do |hook| klass.send(hook, *args) end return if before_hooks.any? { |result| result == false } destroyed = Job.destroy(queue_from_class(klass), klass, *args) Plugin.after_dequeue_hooks(klass).each do |hook| klass.send(hook, *args) end destroyed end
Given a Ruby object, returns a string suitable for storage in a queue.
# File lib/resque.rb, line 34 def encode(object) if MultiJson.respond_to?(:dump) && MultiJson.respond_to?(:load) MultiJson.dump object else MultiJson.encode object end end
This method can be used to conveniently add a job to a queue. It assumes the class you're passing it is a real Ruby class (not a string or reference) which either:
a) has a @queue ivar set b) responds to `queue`
If either of those conditions are met, it will use the value obtained from performing one of the above operations to determine the queue.
If no queue can be inferred this method will raise a `Resque::NoQueueError`
Returns true if the job was queued, nil if the job was rejected by a before_enqueue hook.
This method is considered part of the `stable` API.
# File lib/resque.rb, line 432 def enqueue(klass, *args) enqueue_to(queue_from_class(klass), klass, *args) end
# File lib/resque.rb, line 216 def enqueue_front if defined? @enqueue_front @enqueue_front else @enqueue_front = false end end
Just like `enqueue` but allows you to specify the queue you want to use. Runs hooks.
`queue` should be the String name of the queue you're targeting.
Returns true if the job was queued, nil if the job was rejected by a before_enqueue hook.
This method is considered part of the `stable` API.
# File lib/resque.rb, line 445 def enqueue_to(queue, klass, *args) # Perform before_enqueue hooks. Don't perform enqueue if any hook returns false before_hooks = Plugin.before_enqueue_hooks(klass).collect do |hook| klass.send(hook, *args) end return nil if before_hooks.any? { |result| result == false } Job.create(queue, klass, *args) Plugin.after_enqueue_hooks(klass).each do |hook| klass.send(hook, *args) end return true end
# File lib/resque.rb, line 193 def heartbeat_interval if defined? @heartbeat_interval @heartbeat_interval else DEFAULT_HEARTBEAT_INTERVAL end end
Returns a hash, similar to redis-rb's info
, of interesting stats.
# File lib/resque.rb, line 564 def info return { :pending => queue_sizes.inject(0) { |sum, (_queue_name, queue_size)| sum + queue_size }, :processed => Stat[:processed], :queues => queues.size, :workers => workers.size.to_i, :working => working.size, :failed => data_store.num_failed, :servers => [redis_id], :environment => ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'development' } end
Returns an array of all known Resque
keys in Redis. Redis' KEYS operation is O(N) for the keyspace, so be careful - this can be slow for big databases.
# File lib/resque.rb, line 579 def keys data_store.all_resque_keys end
Does the dirty work of fetching a range of items from a Redis list and converting them into Ruby objects.
# File lib/resque.rb, line 386 def list_range(key, start = 0, count = 1) results = data_store.list_range(key, start, count) if count == 1 decode(results) else results.map { |result| decode(result) } end end
Returns an array of items currently queued. Queue name should be a string.
start and count should be integer and can be used for pagination. start is the item to begin, count is how many items to return.
To get the 3rd page of a 30 item, paginatied list one would use:
Resque.peek('my_list', 59, 30)
# File lib/resque.rb, line 375 def peek(queue, start = 0, count = 1) results = data_store.peek_in_queue(queue,start,count) if count == 1 decode(results) else results.map { |result| decode(result) } end end
Pops a job off a queue. Queue name should be a string.
Returns a Ruby object.
# File lib/resque.rb, line 357 def pop(queue) decode(data_store.pop_from_queue(queue)) end
# File lib/resque.rb, line 203 def prune_interval if defined? @prune_interval @prune_interval else DEFAULT_PRUNE_INTERVAL end end
Pushes a job onto a queue. Queue name should be a string and the item should be any JSON-able Ruby object.
Resque
works generally expect the `item` to be a hash with the following keys:
class - The String name of the job to run. args - An Array of arguments to pass the job. Usually passed via `class.to_class.perform(*args)`.
Example
Resque.push('archive', :class => 'Archive', :args => [ 35, 'tar' ])
Returns nothing
# File lib/resque.rb, line 350 def push(queue, item) data_store.push_to_queue(queue,encode(item)) end
The `queue_empty` hook will be run in the parent process when the worker finds no more jobs in the queue and becomes idle.
Call with a block to register a hook. Call with no arguments to return all registered hooks.
# File lib/resque.rb, line 297 def queue_empty(&block) block ? register_hook(:queue_empty, block) : hooks(:queue_empty) end
Register a queue_empty
proc.
# File lib/resque.rb, line 302 def queue_empty=(block) register_hook(:queue_empty, block) end
Given a class, try to extrapolate an appropriate queue based on a class instance variable or `queue` method.
# File lib/resque.rb, line 506 def queue_from_class(klass) (klass.instance_variable_defined?(:@queue) && klass.instance_variable_get(:@queue)) || (klass.respond_to?(:queue) and klass.queue) end
Returns a hash, mapping queue names to queue sizes
# File lib/resque.rb, line 584 def queue_sizes queue_names = queues sizes = redis.pipelined do queue_names.each do |name| redis.llen("queue:#{name}") end end Hash[queue_names.zip(sizes)] end
Returns an array of all known Resque
queues as strings.
# File lib/resque.rb, line 396 def queues data_store.queue_names end
Returns the current Redis connection. If none has been created, will create a new one.
# File lib/resque.rb, line 141 def redis return @data_store if @data_store self.redis = Redis.respond_to?(:connect) ? Redis.connect : "localhost:6379" self.redis end
Accepts:
1. A 'hostname:port' String 2. A 'hostname:port:db' String (to select the Redis db) 3. A 'hostname:port/namespace' String (to set the Redis namespace) 4. A Redis URL String 'redis://host:port' 5. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`, or `Redis::Namespace`. 6. An Hash of a redis connection {:host => 'localhost', :port => 6379, :db => 0}
# File lib/resque.rb, line 114 def redis=(server) case server when String if server =~ /rediss?\:\/\// redis = Redis.new(:url => server, :thread_safe => true) else server, namespace = server.split('/', 2) host, port, db = server.split(':') redis = Redis.new(:host => host, :port => port, :thread_safe => true, :db => db) end namespace ||= :resque @data_store = Resque::DataStore.new(Redis::Namespace.new(namespace, :redis => redis)) when Redis::Namespace @data_store = Resque::DataStore.new(server) when Resque::DataStore @data_store = server when Hash @data_store = Resque::DataStore.new(Redis::Namespace.new(:resque, :redis => Redis.new(server))) else @data_store = Resque::DataStore.new(Redis::Namespace.new(:resque, :redis => server)) end end
# File lib/resque.rb, line 148 def redis_id data_store.identifier end
Given a queue name, completely deletes the queue.
# File lib/resque.rb, line 401 def remove_queue(queue) data_store.remove_queue(queue) end
A shortcut to unregister_worker useful for command line tool
# File lib/resque.rb, line 554 def remove_worker(worker_id) worker = Resque::Worker.find(worker_id) worker.unregister_worker end
This method will return a `Resque::Job` object or a non-true value depending on whether a job can be obtained. You should pass it the precise name of a queue: case matters.
This method is considered part of the `stable` API.
# File lib/resque.rb, line 516 def reserve(queue) Job.reserve(queue) end
Returns a hash, mapping queue names to (up to `sample_size`) samples of jobs in that queue
# File lib/resque.rb, line 597 def sample_queues(sample_size = 1000) queue_names = queues samples = redis.pipelined do queue_names.each do |name| key = "queue:#{name}" redis.llen(key) redis.lrange(key, 0, sample_size - 1) end end hash = {} queue_names.zip(samples.each_slice(2).to_a) do |queue_name, (queue_size, serialized_samples)| samples = serialized_samples.map do |serialized_sample| Job.decode(serialized_sample) end hash[queue_name] = { :size => queue_size, :samples => samples } end hash end
Returns an integer representing the size of a queue. Queue name should be a string.
# File lib/resque.rb, line 363 def size(queue) data_store.queue_size(queue) end
Returns the data store for the statistics module.
# File lib/resque.rb, line 180 def stat_data_store Resque::Stat.data_store end
Set the data store for the processed and failed statistics.
By default it uses the same as `Resque.redis`, but different stores can be used.
A custom store needs to obey the following API to work correctly
class NullDataStore
# Returns the current value for the given stat. def stat(stat) end # Increments the stat by the given value. def increment_stat(stat, by) end # Decrements the stat by the given value. def decrement_stat(stat, by) end # Clear the values for the given stat. def clear_stat(stat) end
end
# File lib/resque.rb, line 175 def stat_data_store=(stat_data_store) Resque::Stat.data_store = stat_data_store end
# File lib/resque.rb, line 320 def to_s "Resque Client connected to #{redis_id}" end
Validates if the given klass could be a valid Resque
job
If no queue can be inferred this method will raise a `Resque::NoQueueError`
If given klass is nil this method will raise a `Resque::NoClassError`
# File lib/resque.rb, line 525 def validate(klass, queue = nil) queue ||= queue_from_class(klass) if !queue raise NoQueueError.new("Jobs must be placed onto a queue. No queue could be inferred for class #{klass}") end if klass.to_s.empty? raise NoClassError.new("Jobs must be given a class.") end end
Used internally to keep track of which queues we've created. Don't call this directly.
# File lib/resque.rb, line 407 def watch_queue(queue) data_store.watch_queue(queue) end
The `worker_exit` hook will be run in the parent process after the worker has existed (via SIGQUIT, SIGTERM, SIGINT, etc.).
Call with a block to register a hook. Call with no arguments to return all registered hooks.
# File lib/resque.rb, line 311 def worker_exit(&block) block ? register_hook(:worker_exit, block) : hooks(:worker_exit) end
Register a worker_exit
proc.
# File lib/resque.rb, line 316 def worker_exit=(block) register_hook(:worker_exit, block) end
A shortcut to Worker.all
# File lib/resque.rb, line 543 def workers Worker.all end
A shortcut to Worker.working
# File lib/resque.rb, line 548 def working Worker.working end
Private Instance Methods
Clear all hooks given a hook name.
# File lib/resque.rb, line 640 def clear_hooks(name) @hooks[name] = [] end
Retrieve all hooks of a given name.
# File lib/resque.rb, line 645 def hooks(name) @hooks[name] end
Register a new proc as a hook. If the block is nil this is the equivalent of removing all hooks of the given name.
`name` is the hook that the block should be registered with.
# File lib/resque.rb, line 632 def register_hook(name, block) return clear_hooks(name) if block.nil? block = Array(block) @hooks[name].concat(block) end