class Mongo::Dequeue

heavily inspired by github.com/skiz/mongo_queue

Constants

DEFAULT_CONFIG

Attributes

batch[R]
collection[R]
config[R]

Public Class Methods

generate_duplicate_key(body) click to toggle source
# File lib/mongo-dequeue.rb, line 232
def self.generate_duplicate_key(body)
        return Digest::MD5.hexdigest(body) if body.class == "String"
        return Digest::MD5.hexdigest(body) if body.class == "Fixnum"
        #else
        return Digest::MD5.hexdigest(body.to_json) #won't ever match a duplicate. Need a better way to handle hashes and arrays.
end
new(collection, opts={}) click to toggle source

Create a new instance of MongoDequeue with the provided mongodb connection and optional configuration. See DEFAULT_CONFIG for default configuration and possible configuration options.

Example:

db = Mongo::Connection.new('localhost')
config = {:timeout => 90, :attempts => 2}
queue = Mongo::Queue.new(db, config)
# File lib/mongo-dequeue.rb, line 23
def initialize(collection, opts={})
        @collection = collection
        @config = DEFAULT_CONFIG.merge(opts)
        @batch = []
end

Public Instance Methods

batchprocess() click to toggle source
# File lib/mongo-dequeue.rb, line 72
def batchprocess()
        js = %Q|
        function(batch) {
                var nowutc = new Date();
                var ret = [];
                for(i in batch){
                        e = batch[i];
                        //ret.push(e);
                        var query = {
                                'duplicate_key': e.duplicate_key,
                                'complete': false,
                                'locked_at': null
                        };
                        var object = {
                                '$set': {
                                                'body': e.body,
                                                'inserted_at': nowutc,
                                                'complete': false,
                                                'locked_till': null,
                                                'completed_at': null,
                                                'priority': e.priority,
                                                'duplicate_key': e.duplicate_key,
                                                'completecount': 0
                                        },
                                        '$inc': {'count': 1}
                        };
        
                        db.#{collection.name}.update(query, object, true);
                }
                return ret;
        }
|
cmd = BSON::OrderedHash.new
cmd['$eval'] = js
cmd['args'] = [@batch]
cmd['nolock'] = true
        result = collection.db.command(cmd)
        @batch.clear
        #pp result
end
batchpush(body, item_opts = {}) click to toggle source

add a new item into the delayed batch

# File lib/mongo-dequeue.rb, line 64
def batchpush(body, item_opts = {})
        @batch << {
                :body => body,
                :duplicate_key => item_opts[:duplicate_key] || Mongo::Dequeue.generate_duplicate_key(body),
                :priority => item_opts[:priority] || @config[:default_priority]
        }
end
cleanup() click to toggle source

Removes completed job history

# File lib/mongo-dequeue.rb, line 157
def cleanup()
        collection.remove({:complete=>true});
end
complete(id) click to toggle source

Remove the document from the queue. This should be called when the work is done and the document is no longer needed. You must provide the process identifier that the document was locked with to complete it.

# File lib/mongo-dequeue.rb, line 142
def complete(id)
        begin
                cmd = BSON::OrderedHash.new
                cmd['findandmodify'] = collection.name
                cmd['query']         = {:_id => BSON::ObjectId.from_string(id)}
                cmd['update']        = {'$set' => {:completed_at => Time.now.utc, :complete => true}, '$inc' => {:completecount => 1} }
                cmd['limit']         = 1
                collection.db.command(cmd)
        rescue Mongo::OperationFailure => of
        #opfailure happens when item has been already completed
        return nil
        end
end
flush!() click to toggle source

Remove all items from the queue. Use with caution!

# File lib/mongo-dequeue.rb, line 30
def flush!
        collection.drop
end
peek() click to toggle source
# File lib/mongo-dequeue.rb, line 239
def peek
        firstfew = collection.find({
                :complete => false,
                '$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}]
        },
        :sort => [[:priority, :descending],[:inserted_at, :ascending]],
        :limit => 10)
        return firstfew
end
pop(opts = {}) click to toggle source

{:body=>“foo”, :id=>“4e039c372b70275e345206e4”}

# File lib/mongo-dequeue.rb, line 120
def pop(opts = {})
        begin
                timeout = opts[:timeout] || @config[:timeout]
                cmd = BSON::OrderedHash.new
                cmd['findandmodify'] = collection.name
                cmd['update']        = {'$set' => {:locked_till => Time.now.utc+timeout}}
                cmd['query']         = {:complete => false, '$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}] }
                cmd['sort']          = {:priority=>-1,:inserted_at=>1}
                cmd['limit']         = 1
                cmd['new']           = true
                result = collection.db.command(cmd)
        rescue Mongo::OperationFailure => of
        return nil
        end
        return {
                :body => result['value']['body'],
                :id => result['value']['_id'].to_s
        }
end
push(body, item_opts = {}) click to toggle source

Insert a new item into the queue.

Example:

queue.insert(:name => 'Billy', :email => 'billy@example.com', :message => 'Here is the thing you asked for')
# File lib/mongo-dequeue.rb, line 38
def push(body, item_opts = {})
        dup_key = item_opts[:duplicate_key] || Mongo::Dequeue.generate_duplicate_key(body)

        selector = {
                :duplicate_key => dup_key,
                :complete => false,
                :locked_at => nil
        }
        item = {
                '$set' => {
                        :body => body,
                        :inserted_at => Time.now.utc,
                        :complete => false,
                        :locked_till => nil,
                        :completed_at => nil,
                        :priority => item_opts[:priority] || @config[:default_priority],
                        :duplicate_key => dup_key,
                        :completecount => 0
                },
                '$inc' => {:count => 1 }
        }

        id = collection.update(selector, item, :upsert => true)
end
stats() click to toggle source

Provides some information about what is in the queue. We are using an eval to ensure that a lock is obtained during the execution of this query so that the results are not skewed. please be aware that it will lock the database during the execution, so avoid using it too often, even though it it very tiny and should be relatively fast.

# File lib/mongo-dequeue.rb, line 165
def stats
        js = "function queue_stat(){
                      return db.eval(
                      function(){
                             var nowutc = new Date();
                             var a = db.#{collection.name}.count({'complete': false, '$or':[{'locked_till':null},{'locked_till':{'$lt':nowutc}}] });
                        var c = db.#{collection.name}.count({'complete': true});
                        var t = db.#{collection.name}.count();
                        var l = db.#{collection.name}.count({'complete': false, 'locked_till': {'$gte':nowutc} });
                        var rc = db.#{collection.name}.group({
                             'key': {},
                             'cond': {'complete':true},
                             '$reduce': function(obj, prev){prev.count += (obj.completecount - 1);},
                             'initial': {count: 0}
                        });
                        var p = db.#{collection.name}.group({
                                        'key': {'priority':1},
                                        'cond': {},
                                        '$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}},
                                        'initial': {complete: 0, waiting:0}
                                });
                                var tasks = db.#{collection.name}.group({
                                        'key': {'body.task':1},
                                        'cond': {},
                                        '$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}},
                                        'initial': {complete: 0, waiting:0}
                                });

                        return [a, c, t, l, rc[0] ? rc[0].count : 0, p, tasks];
                      }
                    );
                  }"

        #possible additions

        #db.job_queue.group({
        #'key': {'priority':1},
        #'cond': {},
        #'$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}},
        #'initial': {complete: 0, waiting:0}
        #});

        #db.job_queue.group({
        #'key': {'body.task':1},
        #'cond': {},
        #'$reduce': function(obj, prev){if(obj.complete){prev.complete += 1;}else{prev.waiting += 1;}},
        #'initial': {complete: 0, waiting:0}
        #});

        cmd = BSON::OrderedHash.new
cmd['$eval'] = js
cmd['nolock'] = true

        available, complete, total, locked, redundant_completes, priority, tasks =  collection.db.command(cmd)['retval']

        #available, complete, total, locked, redundant_completes, priority, tasks = collection.db.eval(js)
        
        { :locked    => locked.to_i,
                :complete => complete.to_i,
                :available => available.to_i,
                :total     => total.to_i,
                :redundantcompletes => redundant_completes,
                :priority => priority,
                :tasks => tasks
        }
end