class QC::Queue
The queue class maps a queue abstraction onto a database table.
Attributes
Public Class Methods
# File lib/queue_classic/queue.rb, line 10 def initialize(name, top_bound=nil) @name = name @top_bound = top_bound || QC.top_bound end
Public Instance Methods
# File lib/queue_classic/queue.rb, line 19 def conn_adapter @adapter ||= QC.default_conn_adapter end
# File lib/queue_classic/queue.rb, line 15 def conn_adapter=(a) @adapter = a end
# File lib/queue_classic/queue.rb, line 110 def count QC.log_yield(:measure => 'queue.count') do s = "SELECT COUNT(*) FROM #{QC.table_name} WHERE q_name = $1" r = conn_adapter.execute(s, name) r["count"].to_i end end
# File lib/queue_classic/queue.rb, line 97 def delete(id) QC.log_yield(:measure => 'queue.delete') do conn_adapter.execute("DELETE FROM #{QC.table_name} WHERE id = $1", id) end end
# File lib/queue_classic/queue.rb, line 103 def delete_all QC.log_yield(:measure => 'queue.delete_all') do s = "DELETE FROM #{QC.table_name} WHERE q_name = $1" conn_adapter.execute(s, name) end end
enqueue(m,a) inserts a row into the jobs table and trigger a notification. The job's queue is represented by a name column in the row. There is a trigger on the table which will send a NOTIFY event on a channel which corresponds to the name of the queue. The method argument is a string encoded ruby expression. The expression will be separated by a `.` character and then `eval`d. Examples of the method argument include: `puts`, `Kernel.puts`, `MyObject.new.puts`. The args argument will be encoded as JSON and stored as a JSON datatype in the row. (If the version of PG does not support JSON, then the args will be stored as text. The args are stored as a collection and then splatted inside the worker. Examples of args include: `'hello world'`, `['hello world']`, `'hello', 'world'`. This method returns a hash with the id of the enqueued job.
# File lib/queue_classic/queue.rb, line 38 def enqueue(method, *args) QC.log_yield(:measure => 'queue.enqueue') do s = "INSERT INTO #{QC.table_name} (q_name, method, args) VALUES ($1, $2, $3) RETURNING id" conn_adapter.execute(s, name, method, JSON.dump(args)) end end
enqueue_at
(t,m,a) inserts a row into the jobs table representing a job to be executed not before the specified time. The time argument must be a Time object or a float timestamp. The method and args argument must be in the form described in the documentation for the enqueue
method. This method returns a hash with the id of the enqueued job.
# File lib/queue_classic/queue.rb, line 51 def enqueue_at(timestamp, method, *args) offset = Time.at(timestamp).to_i - Time.now.to_i enqueue_in(offset, method, *args) end
enqueue_in
(t,m,a) inserts a row into the jobs table representing a job to be executed not before the specified time offset. The seconds argument must be an integer. The method and args argument must be in the form described in the documentation for the enqueue
method. This method returns a hash with the id of the enqueued job.
# File lib/queue_classic/queue.rb, line 62 def enqueue_in(seconds, method, *args) QC.log_yield(:measure => 'queue.enqueue') do s = "INSERT INTO #{QC.table_name} (q_name, method, args, scheduled_at) VALUES ($1, $2, $3, now() + interval '#{seconds.to_i} seconds') RETURNING id" conn_adapter.execute(s, name, method, JSON.dump(args)) end end
# File lib/queue_classic/queue.rb, line 71 def lock QC.log_yield(:measure => 'queue.lock') do s = "SELECT * FROM lock_head($1, $2)" if r = conn_adapter.execute(s, name, top_bound) {}.tap do |job| job[:id] = r["id"] job[:q_name] = r["q_name"] job[:method] = r["method"] job[:args] = JSON.parse(r["args"]) if r["scheduled_at"] job[:scheduled_at] = Time.parse(r["scheduled_at"]) ttl = Integer((Time.now - job[:scheduled_at]) * 1000) QC.measure("time-to-lock=#{ttl}ms source=#{name}") end end end end end
# File lib/queue_classic/queue.rb, line 90 def unlock(id) QC.log_yield(:measure => 'queue.unlock') do s = "UPDATE #{QC.table_name} SET locked_at = NULL WHERE id = $1" conn_adapter.execute(s, id) end end