class Jobi::Job
Public Class Methods
after_run(callback)
click to toggle source
# File lib/jobi/job.rb, line 17 def after_run(callback) @after_run_callback = callback if callback end
consume_messages()
click to toggle source
# File lib/jobi/job.rb, line 31 def consume_messages return unless Jobi.consumer? join_queue @consumer_threads = [] @consumers.times do @consumer_threads << Thread.new { consumer.consume! } end @consumer_threads.join(&:join) end
options(queue_name:, ack: false, consumers: 5, durable: true, persist: false)
click to toggle source
# File lib/jobi/job.rb, line 9 def options(queue_name:, ack: false, consumers: 5, durable: true, persist: false) @queue_name = queue_name.to_s @ack = ack @consumers = consumers @durable = durable @persist = persist end
run(**args)
click to toggle source
# File lib/jobi/job.rb, line 21 def run(**args) return unless Jobi.publisher? before_start(args) start rescue Error => e Jobi.logger.error('Failed to process the job') Jobi.logger.error(e) end
Private Class Methods
before_start(args)
click to toggle source
# File lib/jobi/job.rb, line 45 def before_start(args) create_message(args: args) log_job_info! end
build_message_params()
click to toggle source
# File lib/jobi/job.rb, line 95 def build_message_params { message: Marshal.dump(@message), queue: @queue, options: { persist: @persist } } end
build_queue_params()
click to toggle source
# File lib/jobi/job.rb, line 86 def build_queue_params { name: @queue_name, options: { durable: @durable } } end
consumer()
click to toggle source
# File lib/jobi/job.rb, line 56 def consumer class_const = constantize("Jobi::Consumers::#{Jobi.client_class_name}") class_const.new(queue: @queue, ack: @ack) end
create_message(args:)
click to toggle source
# File lib/jobi/job.rb, line 71 def create_message(args:) @message = Message.new( job_class: self, args: args, after_run: @after_run_callback ) end
join_queue()
click to toggle source
# File lib/jobi/job.rb, line 61 def join_queue @queue = Jobi.session .queue(**build_queue_params) end
log_job_info!()
click to toggle source
# File lib/jobi/job.rb, line 79 def log_job_info! Jobi.logger.info("A job has been started with id: [#{@message.id}]") Jobi.logger.debug("Queue: '#{@queue_name}'") Jobi.logger.debug("Args: #{@message.args}") Jobi.logger.debug("Job class: '#{@message.job_class}'") end
publish_message()
click to toggle source
# File lib/jobi/job.rb, line 66 def publish_message Jobi.session .publish(**build_message_params) end
start()
click to toggle source
# File lib/jobi/job.rb, line 50 def start join_queue publish_message @message.id end