class RocketJob::Jobs::DirmonJob
Notes:
-
Jobs
that do not implement upload must have either ‘upload_file_name` or `full_file_name` as an attribute.
With RocketJob
Pro, the file is automatically uploaded into the job itself using the job’s upload method, after which the file is archived or deleted if no archive_directory was specified in the DirmonEntry
.
To start Dirmon for the first time
RocketJob::Jobs::DirmonJob.create!
If another DirmonJob
instance is already queued or running, then the create above will fail with:
Validation failed: State Another instance of this job is already queued or running
Or to start DirmonJob
and ignore errors if already running
RocketJob::Jobs::DirmonJob.create
Public Instance Methods
Checks the directories for new files, starting jobs if files have not changed since the last run.
# File lib/rocket_job/jobs/dirmon_job.rb, line 53 def perform check_directories end
Private Instance Methods
Iterate over each Dirmon Entry looking for new files If a new file is found, it is not processed immediately, instead it is passed to the next run of this job along with the file size. If the file size has not changed, the Job
is kicked off.
# File lib/rocket_job/jobs/dirmon_job.rb, line 63 def check_directories new_file_names = {} DirmonEntry.enabled.each do |dirmon_entry| check_entry(dirmon_entry, new_file_names) end self.previous_file_names = new_file_names end
# File lib/rocket_job/jobs/dirmon_job.rb, line 71 def check_entry(dirmon_entry, file_names) dirmon_entry.each do |path| # Skip file size checking since S3 files are only visible once completely uploaded. unless path.partial_files_visible? logger.info("File: #{path}. Starting: #{dirmon_entry.job_class_name}") dirmon_entry.later(path) next end # BSON Keys cannot contain periods key = path.to_s.tr(".", "_") previous_size = previous_file_names[key] # Check every few minutes for a file size change before trying to process the file. size = check_file(dirmon_entry, path, previous_size) file_names[key] = size if size end rescue StandardError => e logger.error("Dirmon Entry: #{dirmon_entry.id} failed. Moved to `failed` state to prevent processing again without manual intervention.", e) dirmon_entry.fail(worker_name, e) dirmon_entry.save(validate: false) end
Checks if a file should result in starting a job Returns [Integer] file size, or nil if the file started a job
# File lib/rocket_job/jobs/dirmon_job.rb, line 95 def check_file(dirmon_entry, path, previous_size) size = path.size if previous_size && (previous_size == size) logger.info("File stabilized: #{path}. Starting: #{dirmon_entry.job_class_name}") dirmon_entry.later(path) nil else logger.info("Found file: #{path}. File size: #{size}") # Keep for the next run size end end