class Chore::Queues::Filesystem::Consumer

This is the consuming side of the file system queue. This class consumes jobs created by FilesystemPublisher#publish. The root of the file system queue is configured in Chore.config.fs_queue_root. In there a directory will be created for each queue name. Each queue directory contains a directory called “new” and one called “inprogress”. FilesystemPublisher#publish creates new job files in the “new” directory. This consumer polls that directory every 5 seconds for new jobs which are moved to “inprogress”.

Once complete job files are deleted. If rejected they are moved back into new and will be processed again. This may not be the desired behavior long term and we may want to add configuration to this class to allow more creating failure handling and retrying.

Constants

EXPIRATION_CHECK_INTERVAL

The minimum number of seconds to allow to pass between checks for expired jobs on the filesystem.

Since queue times are measured on the order of seconds, 1 second is the smallest duration. It also prevents us from burning a lot of CPU looking at expired jobs when the consumer sleep interval is less than 1 second.

Attributes

queue_timeout[R]

The amount of time units of work can run before the queue considers them timed out. For filesystem queues, this is the global default.

Public Class Methods

cleanup(expiration_time, new_dir, in_progress_dir) click to toggle source

Cleans up expired in-progress files by making them new again.

# File lib/chore/queues/filesystem/consumer.rb, line 26
def cleanup(expiration_time, new_dir, in_progress_dir)
  each_file(in_progress_dir) do |job_file|
    id, previous_attempts, timestamp = file_info(job_file)
    next if timestamp > expiration_time

    begin
      make_new_again(job_file, new_dir, in_progress_dir)
    rescue Errno::ENOENT
      # File no longer exists; skip since it's been recovered by another
      # consumer
    rescue ArgumentError
      # Move operation was attempted at same time as another consumer;
      # skip since the other process succeeded where this one didn't
    end
  end
end
each_file(path, limit = nil) { |file| ... } click to toggle source
# File lib/chore/queues/filesystem/consumer.rb, line 94
def each_file(path, limit = nil)
  count = 0

  Dir.foreach(path) do |file|
    next if file.start_with?('.')

    yield file

    count += 1
    break if limit && count >= limit
  end
end
file_info(job_file) click to toggle source

Grabs the unique identifier for the job filename and the number of times it's been attempted (also based on the filename)

# File lib/chore/queues/filesystem/consumer.rb, line 109
def file_info(job_file)
  id, previous_attempts, timestamp, * = job_file.split('.')
  [id, previous_attempts.to_i, timestamp.to_i]
end
make_in_progress(job, new_dir, in_progress_dir, queue_timeout) click to toggle source

Moves job file to inprogress directory and returns the full path if the job was successfully locked by this consumer

# File lib/chore/queues/filesystem/consumer.rb, line 45
def make_in_progress(job, new_dir, in_progress_dir, queue_timeout)
  basename, previous_attempts, * = file_info(job)

  from = File.join(new_dir, job)
  # Add a timestamp to mark when the job was started
  to = File.join(in_progress_dir, "#{basename}.#{previous_attempts}.#{Time.now.to_i}.job")

  # If the file is non-zero, this means it was successfully written to
  # by a publisher and we can attempt to move it to "in progress".
  #
  # There is a small window of time where the file can be zero, but
  # the publisher hasn't finished writing to the file yet.
  if !File.zero?(from)
    File.open(from, "r") do |f|
      # If the lock can't be obtained, that means it's been locked
      # by another consumer or the publisher of the file) -- don't
      # block and skip it
      if f.flock(File::LOCK_EX | File::LOCK_NB)
        FileUtils.mv(from, to)
        to
      end
    end
  elsif (Time.now - File.ctime(from)) >= queue_timeout
    # The file is empty (zero bytes) and enough time has passed since
    # the file was written that we can safely assume it will never
    # get written to be the publisher.
    #
    # The scenario where this happens is when the publisher created
    # the file, but the process was killed before it had a chance to
    # actually write the data.
    File.delete(from)
    nil
  end
rescue Errno::ENOENT
  # File no longer exists; skip it since it's been picked up by
  # another consumer
end
make_new_again(job, new_dir, in_progress_dir) click to toggle source

Moves job file to new directory and returns the full path

# File lib/chore/queues/filesystem/consumer.rb, line 84
def make_new_again(job, new_dir, in_progress_dir)
  basename, previous_attempts = file_info(job)

  from = File.join(in_progress_dir, job)
  to = File.join(new_dir, "#{basename}.#{previous_attempts + 1}.job")
  FileUtils.mv(from, to)

  to
end
new(queue_name, opts={}) click to toggle source
Calls superclass method Chore::Consumer::new
# File lib/chore/queues/filesystem/consumer.rb, line 127
def initialize(queue_name, opts={})
  super(queue_name, opts)

  @in_progress_dir = self.class.in_progress_dir(queue_name)
  @new_dir = self.class.new_dir(queue_name)
  @queue_timeout = self.class.queue_timeout(queue_name)
end

Public Instance Methods

complete(message_id, receipt_handle = nil) click to toggle source

Deletes the given message from filesystem queue. Since the filesystem is not a remote API, there is no notion of a “receipt handle”.

@param [String] message_id Unique ID of the message @param [Hash] receipt_handle Receipt handle of the message. Always nil for the filesystem consumer

# File lib/chore/queues/filesystem/consumer.rb, line 169
def complete(message_id, receipt_handle = nil)
  Chore.logger.debug "Completing (deleting): #{message_id}"
  File.delete(File.join(@in_progress_dir, message_id))
rescue Errno::ENOENT
  # The job took too long to complete, was deemed expired, and moved
  # back into "new".  Ignore.
end
consume() { |*args| ... } click to toggle source
# File lib/chore/queues/filesystem/consumer.rb, line 135
def consume
  Chore.logger.info "Starting consuming file system queue #{@queue_name} in #{self.class.queue_dir(queue_name)}"
  while running?
    begin
      # Move expired job files to new directory (so long as enough time has
      # passed since we last did this check)
      if !@last_cleaned_at || (Time.now - @last_cleaned_at).to_i >= EXPIRATION_CHECK_INTERVAL
        self.class.cleanup(Time.now.to_i - @queue_timeout, @new_dir, @in_progress_dir)
        @last_cleaned_at = Time.now
      end

      found_files = false
      handle_messages do |*args|
        found_files = true
        yield(*args)
      end
    rescue => e
      Chore.logger.error { "#{self.class}#consume: #{e} #{e.backtrace * "\n"}" }
    ensure
      sleep(Chore.config.consumer_sleep_interval) unless found_files
    end
  end
end
reject(id) click to toggle source

Rejects the given message from the filesystem by id. Currently a noop

# File lib/chore/queues/filesystem/consumer.rb, line 160
def reject(id)

end

Private Instance Methods

handle_messages(&block) click to toggle source

finds all new job files, moves them to in progress and starts the job Returns a list of the job files processed

# File lib/chore/queues/filesystem/consumer.rb, line 181
def handle_messages(&block)
  self.class.each_file(@new_dir, Chore.config.queue_polling_size) do |job_file|
    Chore.logger.debug "Found a new job #{job_file}"

    in_progress_path = make_in_progress(job_file)
    next unless in_progress_path

    # The job filename may have changed, so update it to reflect the in progress path
    job_file = File.basename(in_progress_path)

    job_json = File.read(in_progress_path)
    basename, previous_attempts, * = self.class.file_info(job_file)

    # job_file is just the name which is the job id. 2nd argument (:receipt_handle) is nil because the
    # filesystem is dealt with directly, as opposed to being an external API
    block.call(job_file, nil, queue_name, queue_timeout, job_json, previous_attempts)
    Chore.run_hooks_for(:on_fetch, job_file, job_json)
  end
end
make_in_progress(job) click to toggle source
# File lib/chore/queues/filesystem/consumer.rb, line 201
def make_in_progress(job)
  self.class.make_in_progress(job, @new_dir, @in_progress_dir, @queue_timeout)
end
make_new_again(job) click to toggle source
# File lib/chore/queues/filesystem/consumer.rb, line 205
def make_new_again(job)
  self.class.make_new_again(job, @new_dir, @in_progress_dir)
end