class SequenceServer::Job

Abstract job super class.

Provides a simple framework to store job data, execute shell commands asynchronously and capture stdout, stderr and exit status. Subclasses must provide a concrete implementation for ‘command` and may override any other methods as required.

Global ‘config` and `logger` object are available as instance methods.

Singleton methods provide the facility to create and queue a job, fetch a job or all jobs, and delete a job.

Attributes

completed_at[R]
exitstatus[R]
id[R]
submitted_at[R]

Public Class Methods

all() click to toggle source

Returns an Array of all jobs.

# File lib/sequenceserver/job.rb, line 58
def all
  Dir["#{DOTDIR}/**/job.yaml"]
    .map { |f| fetch File.basename File.dirname f }
end
create(params) click to toggle source

Creates and queues a job. Returns created job object.

# File lib/sequenceserver/job.rb, line 26
def create(params)
  job = BLAST::Job.new(params) # TODO: Dynamic dispatch.
  enqueue(job)
end
delete(id) click to toggle source

Deletes job with the given id.

# File lib/sequenceserver/job.rb, line 53
def delete(id)
  FileUtils.rm_r File.join(DOTDIR, id)
end
enqueue(job) click to toggle source

Enqueues a job that is already created, returns the job object

# File lib/sequenceserver/job.rb, line 64
def enqueue(job)
  pool.queue { job.run }
  job
end
fetch(id) click to toggle source

Fetches job with the given id.

# File lib/sequenceserver/job.rb, line 42
def fetch(id)
  job_file = File.join(DOTDIR, id, 'job.yaml')
  return nil unless File.exist?(job_file)

  YAML.safe_load_file(
    job_file,
    permitted_classes: serializable_classes
  )
end
new(params = {}) { || ... } click to toggle source

Initialize the job: generates a job id, creates directory where all kind of job data will be held, yields (if block given) and saves the job.

Subclasses should extend ‘initialize` as per requirement.

# File lib/sequenceserver/job.rb, line 87
def initialize(params = {})
  @id = params.fetch(:id, SecureRandom.uuid)
  @submitted_at = Time.now
  mkdir_p dir
  yield if block_given?
  save
rescue Errno::ENOSPC
  raise SystemError, 'Not enough disk space to start a new job'
rescue Errno::EACCES
  raise SystemError, "Permission denied to write to #{DOTDIR}"
rescue StandardError => e
  rm_rf dir
  raise e
end
serializable_classes() click to toggle source
# File lib/sequenceserver/job.rb, line 31
def serializable_classes
  [
    Time,
    Symbol,
    SequenceServer::Job,
    SequenceServer::BLAST::Job,
    SequenceServer::Database
  ]
end

Private Class Methods

pool() click to toggle source

Thread pool used for running BLAST searches.

# File lib/sequenceserver/job.rb, line 72
def pool
  @pool ||= Pool.new(SequenceServer.config[:num_jobs] || 1)
end

Public Instance Methods

command() click to toggle source

Returns shell command that will be executed. Subclass needs to provide a concrete implementation.

# File lib/sequenceserver/job.rb, line 106
def command
  fail 'Not implemented.'
end
dir() click to toggle source

Where to save all kind of data for this job.

# File lib/sequenceserver/job.rb, line 143
def dir
  File.join(DOTDIR, id)
end
done?() click to toggle source

Is exitstatus of the job available? If yes, it means the job is done.

# File lib/sequenceserver/job.rb, line 121
def done?
  !!@exitstatus
end
raise!() click to toggle source

Raise RuntimeError if job finished with non-zero exit status. This method should be called on a completed job before attempting to use the results. Subclasses should provide their own implementation.

# File lib/sequenceserver/job.rb, line 128
def raise!
  fail if done? && exitstatus != 0
end
run() click to toggle source

Shell out and execute the job.

NOTE: This method is called asynchronously by thread pool.

# File lib/sequenceserver/job.rb, line 113
def run
  sys(command, path: config[:bin], stdout: stdout, stderr: stderr)
  done!
rescue CommandFailed => e
  done! e.exitstatus
end
stderr() click to toggle source

Where will the stderr be written to during execution and read from later.

# File lib/sequenceserver/job.rb, line 138
def stderr
  File.join(dir, 'stderr')
end
stdout() click to toggle source

Where will the stdout be written to during execution and read from later.

# File lib/sequenceserver/job.rb, line 133
def stdout
  File.join(dir, 'stdout')
end

Private Instance Methods

done!(status = 0) click to toggle source

Marks the job as done and save its exitstatus.

# File lib/sequenceserver/job.rb, line 180
def done!(status = 0)
  @completed_at = Time.now
  @exitstatus = status
  save
end
fetch(key) click to toggle source

Retrieve file from job dir with the given name. Raises RuntimeError if the file can’t be found.

NOTE: Not used.

# File lib/sequenceserver/job.rb, line 172
def fetch(key)
  filename = File.join(dir, key)
  fail unless File.exist? filename

  filename
end
save() click to toggle source

Saves job object to a YAML file in job directory.

# File lib/sequenceserver/job.rb, line 150
def save
  File.write(yfile, to_yaml)
end
store(key, value) click to toggle source

Save arbitrary blob of data for this job to a file. Returns absolute path to the file. Doesn’t mean the saved file will be linked to the job object Downstream code must do that itself.

NOTE:

Job dir should have been created before `store` is called.  In a
subclass this can be ensured by appropriately calling `super` in
`initialize` method.
# File lib/sequenceserver/job.rb, line 162
def store(key, value)
  filename = File.join(dir, key)
  File.write(filename, value)
  filename
end
yfile() click to toggle source

Where to write serialised job object.

# File lib/sequenceserver/job.rb, line 187
def yfile
  File.join(dir, 'job.yaml')
end