class StubbornQueue

Attributes

db[R]

Public Class Methods

new(options = {}) click to toggle source
# File lib/stubborn_queue.rb, line 7
def initialize(options = {})
  @name = options.fetch :name, 'default'
  @timeout = options.fetch :timeout, 60
  path = options.fetch :file, ".#{@name}.db"
  @db = Moneta.new :Daybreak, expires: true, file: path
end

Public Instance Methods

claims() click to toggle source
# File lib/stubborn_queue.rb, line 42
def claims
  @db.fetch key_for(:claimed_list), []
end
create_id() click to toggle source
# File lib/stubborn_queue.rb, line 34
def create_id
  @db.increment key_for(:id_count)
end
dequeue() click to toggle source
# File lib/stubborn_queue.rb, line 66
def dequeue
  process_expired_claims
  id = @db.rpoplpush key_for(:pending_list), key_for(:claimed_list)
  @db.store key_for(:claimed_flag, with_id: id), true, expires: @timeout
  id
end
enqueue(item) click to toggle source
# File lib/stubborn_queue.rb, line 59
def enqueue(item)
  process_expired_claims
  id = create_id
  @db.store key_for(:item_store, with_id: id), item
  @db.lpush key_for(:pending_list), id
end
finish(id) click to toggle source
# File lib/stubborn_queue.rb, line 77
def finish(id)
  @db.store key_for(:finished_flag, with_id: id), true
end
finished?(id) click to toggle source
# File lib/stubborn_queue.rb, line 50
def finished?(id)
  @db.fetch key_for(:finished_flag, with_id: id), false
end
key_for(type, with_id: 0) click to toggle source
# File lib/stubborn_queue.rb, line 14
def key_for(type, with_id: 0)
  key = case type
  when :pending_list
    "#{@name}:pending"
  when :claimed_list
    "#{@name}:claimed"
  when :claimed_flag
    "#{@name}:task:#{with_id}:claimed"
  when :finished_flag
    "#{@name}:task:#{with_id}:finished"
  when :item_store
    "#{@name}:@{id}"
  when :id_count
    "#{@name}:id_count"
  else
    raise "Key for '#{type}' is unrecognized."
  end
  "#{@name}:#{key}"
end
lookup(id) click to toggle source
# File lib/stubborn_queue.rb, line 38
def lookup(id)
  @db.fetch key_for(:item_store, with_id: id), nil
end
process_expired_claims() click to toggle source
# File lib/stubborn_queue.rb, line 81
def process_expired_claims
  claims.each do |id|
    next if recent_claim? id
    remove_claim_on id
    requeue id unless finished? id
  end
end
recent_claim?(id) click to toggle source
# File lib/stubborn_queue.rb, line 46
def recent_claim?(id)
  @db.fetch key_for(:claimed_flag, with_id: id), false
end
remove_claim_on(id) click to toggle source
# File lib/stubborn_queue.rb, line 54
def remove_claim_on(id)
  @db.lrem key_for(:claimed_list), 0, id
  @db.delete key_for(:claimed_flag, with_id: id)
end
requeue(id) click to toggle source
# File lib/stubborn_queue.rb, line 73
def requeue(id)
  @db.lpush key_for(:pending_list), id
end