class Mongo::Dequeue
heavily inspired by github.com/skiz/mongo_queue
Constants
- DEFAULT_CONFIG
Attributes
Public Class Methods
# File lib/mongo-dequeue.rb, line 232 def self.generate_duplicate_key(body) return Digest::MD5.hexdigest(body) if body.class == "String" return Digest::MD5.hexdigest(body) if body.class == "Fixnum" #else return Digest::MD5.hexdigest(body.to_json) #won't ever match a duplicate. Need a better way to handle hashes and arrays. end
Create a new instance of MongoDequeue with the provided mongodb connection and optional configuration. See DEFAULT_CONFIG
for default configuration and possible configuration options.
Example:
db = Mongo::Connection.new('localhost') config = {:timeout => 90, :attempts => 2} queue = Mongo::Queue.new(db, config)
# File lib/mongo-dequeue.rb, line 23 def initialize(collection, opts={}) @collection = collection @config = DEFAULT_CONFIG.merge(opts) @batch = [] end
Public Instance Methods
# File lib/mongo-dequeue.rb, line 72 def batchprocess() js = %Q| function(batch) { var nowutc = new Date(); var ret = []; for(i in batch){ e = batch[i]; //ret.push(e); var query = { 'duplicate_key': e.duplicate_key, 'complete': false, 'locked_at': null }; var object = { '$set': { 'body': e.body, 'inserted_at': nowutc, 'complete': false, 'locked_till': null, 'completed_at': null, 'priority': e.priority, 'duplicate_key': e.duplicate_key, 'completecount': 0 }, '$inc': {'count': 1} }; db.#{collection.name}.update(query, object, true); } return ret; } | cmd = BSON::OrderedHash.new cmd['$eval'] = js cmd['args'] = [@batch] cmd['nolock'] = true result = collection.db.command(cmd) @batch.clear #pp result end
add a new item into the delayed batch
# File lib/mongo-dequeue.rb, line 64 def batchpush(body, item_opts = {}) @batch << { :body => body, :duplicate_key => item_opts[:duplicate_key] || Mongo::Dequeue.generate_duplicate_key(body), :priority => item_opts[:priority] || @config[:default_priority] } end
Removes completed job history
# File lib/mongo-dequeue.rb, line 157 def cleanup() collection.remove({:complete=>true}); end
Remove the document from the queue. This should be called when the work is done and the document is no longer needed. You must provide the process identifier that the document was locked with to complete it.
# File lib/mongo-dequeue.rb, line 142 def complete(id) begin cmd = BSON::OrderedHash.new cmd['findandmodify'] = collection.name cmd['query'] = {:_id => BSON::ObjectId.from_string(id)} cmd['update'] = {'$set' => {:completed_at => Time.now.utc, :complete => true}, '$inc' => {:completecount => 1} } cmd['limit'] = 1 collection.db.command(cmd) rescue Mongo::OperationFailure => of #opfailure happens when item has been already completed return nil end end
Remove all items from the queue. Use with caution!
# File lib/mongo-dequeue.rb, line 30 def flush! collection.drop end
# File lib/mongo-dequeue.rb, line 239 def peek firstfew = collection.find({ :complete => false, '$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}] }, :sort => [[:priority, :descending],[:inserted_at, :ascending]], :limit => 10) return firstfew end
{:body=>“foo”, :id=>“4e039c372b70275e345206e4”}
# File lib/mongo-dequeue.rb, line 120 def pop(opts = {}) begin timeout = opts[:timeout] || @config[:timeout] cmd = BSON::OrderedHash.new cmd['findandmodify'] = collection.name cmd['update'] = {'$set' => {:locked_till => Time.now.utc+timeout}} cmd['query'] = {:complete => false, '$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}] } cmd['sort'] = {:priority=>-1,:inserted_at=>1} cmd['limit'] = 1 cmd['new'] = true result = collection.db.command(cmd) rescue Mongo::OperationFailure => of return nil end return { :body => result['value']['body'], :id => result['value']['_id'].to_s } end
Insert a new item into the queue.
Example:
queue.insert(:name => 'Billy', :email => 'billy@example.com', :message => 'Here is the thing you asked for')
# File lib/mongo-dequeue.rb, line 38 def push(body, item_opts = {}) dup_key = item_opts[:duplicate_key] || Mongo::Dequeue.generate_duplicate_key(body) selector = { :duplicate_key => dup_key, :complete => false, :locked_at => nil } item = { '$set' => { :body => body, :inserted_at => Time.now.utc, :complete => false, :locked_till => nil, :completed_at => nil, :priority => item_opts[:priority] || @config[:default_priority], :duplicate_key => dup_key, :completecount => 0 }, '$inc' => {:count => 1 } } id = collection.update(selector, item, :upsert => true) end
Provides some information about what is in the queue. We are using an eval to ensure that a lock is obtained during the execution of this query so that the results are not skewed. please be aware that it will lock the database during the execution, so avoid using it too often, even though it it very tiny and should be relatively fast.
# File lib/mongo-dequeue.rb, line 165 def stats js = "function queue_stat(){ return db.eval( function(){ var nowutc = new Date(); var a = db.#{collection.name}.count({'complete': false, '$or':[{'locked_till':null},{'locked_till':{'$lt':nowutc}}] }); var c = db.#{collection.name}.count({'complete': true}); var t = db.#{collection.name}.count(); var l = db.#{collection.name}.count({'complete': false, 'locked_till': {'$gte':nowutc} }); var rc = db.#{collection.name}.group({ 'key': {}, 'cond': {'complete':true}, '$reduce': function(obj, prev){prev.count += (obj.completecount - 1);}, 'initial': {count: 0} }); var p = db.#{collection.name}.group({ 'key': {'priority':1}, 'cond': {}, '$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}}, 'initial': {complete: 0, waiting:0} }); var tasks = db.#{collection.name}.group({ 'key': {'body.task':1}, 'cond': {}, '$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}}, 'initial': {complete: 0, waiting:0} }); return [a, c, t, l, rc[0] ? rc[0].count : 0, p, tasks]; } ); }" #possible additions #db.job_queue.group({ #'key': {'priority':1}, #'cond': {}, #'$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}}, #'initial': {complete: 0, waiting:0} #}); #db.job_queue.group({ #'key': {'body.task':1}, #'cond': {}, #'$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}}, #'initial': {complete: 0, waiting:0} #}); cmd = BSON::OrderedHash.new cmd['$eval'] = js cmd['nolock'] = true available, complete, total, locked, redundant_completes, priority, tasks = collection.db.command(cmd)['retval'] #available, complete, total, locked, redundant_completes, priority, tasks = collection.db.eval(js) { :locked => locked.to_i, :complete => complete.to_i, :available => available.to_i, :total => total.to_i, :redundantcompletes => redundant_completes, :priority => priority, :tasks => tasks } end