class DispatchRider::QueueServices::FileSystem::Queue

Public Class Methods

new(path) click to toggle source
# File lib/dispatch-rider/queue_services/file_system/queue.rb, line 6
def initialize(path)
  FileUtils.mkdir_p(path)
  @path = path
end

Public Instance Methods

add(item) click to toggle source
# File lib/dispatch-rider/queue_services/file_system/queue.rb, line 11
def add(item)
  name_base = "#{@path}/#{Time.now.to_f}"
  File.open("#{name_base}.inprogress", "w"){ |f| f.write(item) }
  FileUtils.mv("#{name_base}.inprogress", "#{name_base}.ready")
end
pop() click to toggle source
# File lib/dispatch-rider/queue_services/file_system/queue.rb, line 17
def pop
  file_path = next_item(10)
  return nil unless file_path
  file_path_inflight = file_path.gsub(/\.ready$/, '.inflight')
  FileUtils.mv(file_path, file_path_inflight)
  File.new(file_path_inflight)
end
put_back(item) click to toggle source
# File lib/dispatch-rider/queue_services/file_system/queue.rb, line 25
def put_back(item)
  add(item)
  remove(item)
end
remove(item) click to toggle source
# File lib/dispatch-rider/queue_services/file_system/queue.rb, line 30
def remove(item)
  item.close
  File.unlink(item.path)
end
size() click to toggle source
# File lib/dispatch-rider/queue_services/file_system/queue.rb, line 35
def size
  file_paths.size
end

Private Instance Methods

file_paths() click to toggle source
# File lib/dispatch-rider/queue_services/file_system/queue.rb, line 52
def file_paths
  Dir["#{@path}/*.ready"]
end
next_item(timeout = 10.seconds) click to toggle source

Long polling next item fetcher allows to sleep between checks for a new file and not run the main loop as much

# File lib/dispatch-rider/queue_services/file_system/queue.rb, line 43
def next_item(timeout = 10.seconds)
  Timeout.timeout(timeout) do
    sleep 1 until file_paths.first
    file_paths.first
  end
rescue Timeout::Error
  nil
end