class Polipus::QueueOverflow::MongoQueue

Public Class Methods

new(mongo_db, queue_name, options = {}) click to toggle source
# File lib/polipus/queue_overflow/mongo_queue.rb, line 7
def initialize(mongo_db, queue_name, options = {})
  @mongo_db = mongo_db
  @collection_name = "polipus_q_overflow_#{queue_name}"
  @semaphore = Mutex.new
  @options = options
  @options[:ensure_uniq] ||= false
  @options[:ensure_uniq] && ensure_index
end

Public Instance Methods

<<(data)
Alias for: push
clear() click to toggle source
# File lib/polipus/queue_overflow/mongo_queue.rb, line 24
def clear
  @mongo_db[@collection_name].drop
  @options[:ensure_uniq] && ensure_index
end
dec(_ = false)
Alias for: pop
empty?() click to toggle source
# File lib/polipus/queue_overflow/mongo_queue.rb, line 20
def empty?
  !(length > 0)
end
enc(data)
Alias for: push
length() click to toggle source
# File lib/polipus/queue_overflow/mongo_queue.rb, line 16
def length
  @mongo_db[@collection_name].count
end
Also aliased as: size
pop(_ = false) click to toggle source
# File lib/polipus/queue_overflow/mongo_queue.rb, line 38
def pop(_ = false)
  @semaphore.synchronize do
    doc = @mongo_db[@collection_name].find({}, sort: { _id: 1 }).limit(1).first
    return nil if doc.nil?
    @mongo_db[@collection_name].remove(_id: doc['_id'])
    doc && doc['payload'] ? doc['payload'] : nil
  end
end
Also aliased as: dec, shift
push(data) click to toggle source
# File lib/polipus/queue_overflow/mongo_queue.rb, line 29
def push(data)
  if @options[:ensure_uniq]
    @mongo_db[@collection_name].update({ payload: data }, { payload: data }, { upsert: true, w: 1 })
  else
    @mongo_db[@collection_name].insert(payload: data)
  end
  true
end
Also aliased as: enc, <<
shift(_ = false)
Alias for: pop
size()
Alias for: length

Protected Instance Methods

ensure_index() click to toggle source
# File lib/polipus/queue_overflow/mongo_queue.rb, line 55
def ensure_index
  @mongo_db[@collection_name].ensure_index({ payload: 1 }, { background: 1, unique: 1, drop_dups: 1 })
end