class Belated::Client
The client class is responsible for managing the connection to the DRb server. If it has no connection, it adds the jobs to a bank queue. You can enqueue jobs to be processed by the server. Example:
client = Belated::Client.new client.enqueue(JubJub.new, at: Time.now + 5.seconds)
A client that can perform jobs inline
Attributes
Public Instance Methods
# File lib/belated/client.rb, line 61 def delete_from_table return if proc_table.length < 25 @mutex.synchronize do proc_table.select { |_k, v| v.completed }.each do |key, _value| proc_table.delete(key) end end end
The method that pushes the jobs to the queue. If there is no connection, it pushes the job to the bank. @param job [Object] - The the job to be pushed. @param at [Date] - The time at which the job should be executed. @param max_retries [Integer] - Times the job should be retried if it fails. @return [JobWrapper] - The job wrapper for the queue.
# File lib/belated/client.rb, line 77 def perform(job, at: nil, max_retries: 5, active_job: false) start unless started? return unless proper_job?(job) job_wrapper = wrap_job(job, at: at.to_f, max_retries: max_retries, active_job: active_job) bank.push(job_wrapper) @mutex.synchronize do proc_table[job_wrapper.object_id] = job_wrapper if job_wrapper.proc_klass end self.banker_thread = start_banker_thread if banker_thread.nil? job_wrapper end
Starts up the client. Connects to the queue through DRb. @return [void]
# File lib/belated/client.rb, line 19 def start return if started? server_uri = Belated::URI DRb.start_service self.proc_table = {} self.bank = Thread::Queue.new self.queue = DRbObject.new_with_uri(server_uri) @started = true @mutex = Mutex.new end
Thread in charge of handling the bank queue. You probably want to memoize the client in order to avoid having many threads in the sleep state. @return [void]
# File lib/belated/client.rb, line 46 def start_banker_thread Thread.new do loop do delete_from_table sleep Belated.client_heartbeat and next if bank.empty? bank.length.times do queue.push(wrapper = bank.pop) rescue DRb::DRbConnError bank.push(wrapper) end end end end
# File lib/belated/client.rb, line 32 def started? @started end
Makes it possible to reset the client
# File lib/belated/client.rb, line 37 def turn_off @started = false banker_thread&.kill end
Private Instance Methods
# File lib/belated/client.rb, line 112 def drb_connected? queue.connected? rescue StandardError false end
# File lib/belated/client.rb, line 94 def proper_job?(job) return true if job.respond_to?(:call) || job.respond_to?(:perform) warn 'job does not implement .call nor .perform!' false end
# File lib/belated/client.rb, line 101 def wrap_job(job, at:, max_retries:, active_job:) return job if job.is_a?(JobWrapper) wrapper = if active_job ActiveJob::QueueAdapters::BelatedAdapter::JobWrapper else JobWrapper end wrapper.new(job: job, at: at, max_retries: max_retries, active_job: active_job) end