class RocketJob::Jobs::DirmonJob

Notes:

With RocketJob Pro, the file is automatically uploaded into the job itself using the job’s upload method, after which the file is archived or deleted if no archive_directory was specified in the DirmonEntry.

To start Dirmon for the first time

RocketJob::Jobs::DirmonJob.create!

If another DirmonJob instance is already queued or running, then the create above will fail with:

Validation failed: State Another instance of this job is already queued or running

Or to start DirmonJob and ignore errors if already running

RocketJob::Jobs::DirmonJob.create

Public Instance Methods

perform() click to toggle source

Checks the directories for new files, starting jobs if files have not changed since the last run.

# File lib/rocket_job/jobs/dirmon_job.rb, line 53
def perform
  check_directories
end

Private Instance Methods

check_directories() click to toggle source

Iterate over each Dirmon Entry looking for new files If a new file is found, it is not processed immediately, instead it is passed to the next run of this job along with the file size. If the file size has not changed, the Job is kicked off.

# File lib/rocket_job/jobs/dirmon_job.rb, line 63
def check_directories
  new_file_names = {}
  DirmonEntry.enabled.each do |dirmon_entry|
    check_entry(dirmon_entry, new_file_names)
  end
  self.previous_file_names = new_file_names
end
check_entry(dirmon_entry, file_names) click to toggle source
# File lib/rocket_job/jobs/dirmon_job.rb, line 71
def check_entry(dirmon_entry, file_names)
  dirmon_entry.each do |path|
    # Skip file size checking since S3 files are only visible once completely uploaded.
    unless path.partial_files_visible?
      logger.info("File: #{path}. Starting: #{dirmon_entry.job_class_name}")
      dirmon_entry.later(path)
      next
    end

    # BSON Keys cannot contain periods
    key           = path.to_s.tr(".", "_")
    previous_size = previous_file_names[key]
    # Check every few minutes for a file size change before trying to process the file.
    size            = check_file(dirmon_entry, path, previous_size)
    file_names[key] = size if size
  end
rescue StandardError => e
  logger.error("Dirmon Entry: #{dirmon_entry.id} failed. Moved to `failed` state to prevent processing again without manual intervention.", e)
  dirmon_entry.fail(worker_name, e)
  dirmon_entry.save(validate: false)
end
check_file(dirmon_entry, path, previous_size) click to toggle source

Checks if a file should result in starting a job Returns [Integer] file size, or nil if the file started a job

# File lib/rocket_job/jobs/dirmon_job.rb, line 95
def check_file(dirmon_entry, path, previous_size)
  size = path.size
  if previous_size && (previous_size == size)
    logger.info("File stabilized: #{path}. Starting: #{dirmon_entry.job_class_name}")
    dirmon_entry.later(path)
    nil
  else
    logger.info("Found file: #{path}. File size: #{size}")
    # Keep for the next run
    size
  end
end