class Jobi::Job

Public Class Methods

after_run(callback) click to toggle source
# File lib/jobi/job.rb, line 17
def after_run(callback)
  @after_run_callback = callback if callback
end
consume_messages() click to toggle source
# File lib/jobi/job.rb, line 31
def consume_messages
  return unless Jobi.consumer?

  join_queue
  @consumer_threads = []
  @consumers.times do
    @consumer_threads << Thread.new { consumer.consume! }
  end

  @consumer_threads.join(&:join)
end
options(queue_name:, ack: false, consumers: 5, durable: true, persist: false) click to toggle source
# File lib/jobi/job.rb, line 9
def options(queue_name:, ack: false, consumers: 5, durable: true, persist: false)
  @queue_name = queue_name.to_s
  @ack = ack
  @consumers = consumers
  @durable = durable
  @persist = persist
end
run(**args) click to toggle source
# File lib/jobi/job.rb, line 21
def run(**args)
  return unless Jobi.publisher?

  before_start(args)
  start
rescue Error => e
  Jobi.logger.error('Failed to process the job')
  Jobi.logger.error(e)
end

Private Class Methods

before_start(args) click to toggle source
# File lib/jobi/job.rb, line 45
def before_start(args)
  create_message(args: args)
  log_job_info!
end
build_message_params() click to toggle source
# File lib/jobi/job.rb, line 95
def build_message_params
  {
    message: Marshal.dump(@message),
    queue: @queue,
    options: {
      persist: @persist
    }
  }
end
build_queue_params() click to toggle source
# File lib/jobi/job.rb, line 86
def build_queue_params
  {
    name: @queue_name,
    options: {
      durable: @durable
    }
  }
end
consumer() click to toggle source
# File lib/jobi/job.rb, line 56
def consumer
  class_const = constantize("Jobi::Consumers::#{Jobi.client_class_name}")
  class_const.new(queue: @queue, ack: @ack)
end
create_message(args:) click to toggle source
# File lib/jobi/job.rb, line 71
def create_message(args:)
  @message = Message.new(
    job_class: self,
    args: args,
    after_run: @after_run_callback
  )
end
join_queue() click to toggle source
# File lib/jobi/job.rb, line 61
def join_queue
  @queue = Jobi.session
               .queue(**build_queue_params)
end
log_job_info!() click to toggle source
# File lib/jobi/job.rb, line 79
def log_job_info!
  Jobi.logger.info("A job has been started with id: [#{@message.id}]")
  Jobi.logger.debug("Queue: '#{@queue_name}'")
  Jobi.logger.debug("Args: #{@message.args}")
  Jobi.logger.debug("Job class: '#{@message.job_class}'")
end
publish_message() click to toggle source
# File lib/jobi/job.rb, line 66
def publish_message
  Jobi.session
      .publish(**build_message_params)
end
start() click to toggle source
# File lib/jobi/job.rb, line 50
def start
  join_queue
  publish_message
  @message.id
end