class Polipus::QueueOverflow::MongoQueue
Public Class Methods
new(mongo_db, queue_name, options = {})
click to toggle source
# File lib/polipus/queue_overflow/mongo_queue.rb, line 7 def initialize(mongo_db, queue_name, options = {}) @mongo_db = mongo_db @collection_name = "polipus_q_overflow_#{queue_name}" @semaphore = Mutex.new @options = options @options[:ensure_uniq] ||= false @options[:ensure_uniq] && ensure_index end
Public Instance Methods
clear()
click to toggle source
# File lib/polipus/queue_overflow/mongo_queue.rb, line 24 def clear @mongo_db[@collection_name].drop @options[:ensure_uniq] && ensure_index end
empty?()
click to toggle source
# File lib/polipus/queue_overflow/mongo_queue.rb, line 20 def empty? !(length > 0) end
length()
click to toggle source
# File lib/polipus/queue_overflow/mongo_queue.rb, line 16 def length @mongo_db[@collection_name].count end
Also aliased as: size
pop(_ = false)
click to toggle source
# File lib/polipus/queue_overflow/mongo_queue.rb, line 38 def pop(_ = false) @semaphore.synchronize do doc = @mongo_db[@collection_name].find({}, sort: { _id: 1 }).limit(1).first return nil if doc.nil? @mongo_db[@collection_name].remove(_id: doc['_id']) doc && doc['payload'] ? doc['payload'] : nil end end
push(data)
click to toggle source
# File lib/polipus/queue_overflow/mongo_queue.rb, line 29 def push(data) if @options[:ensure_uniq] @mongo_db[@collection_name].update({ payload: data }, { payload: data }, { upsert: true, w: 1 }) else @mongo_db[@collection_name].insert(payload: data) end true end
Protected Instance Methods
ensure_index()
click to toggle source
# File lib/polipus/queue_overflow/mongo_queue.rb, line 55 def ensure_index @mongo_db[@collection_name].ensure_index({ payload: 1 }, { background: 1, unique: 1, drop_dups: 1 }) end