class Belated::Client

The client class is responsible for managing the connection to the DRb server. If it has no connection, it adds the jobs to a bank queue. You can enqueue jobs to be processed by the server. Example:

client = Belated::Client.new
client.enqueue(JubJub.new, at: Time.now + 5.seconds)

A client that can perform jobs inline

Attributes

bank[RW]
banker_thread[RW]
proc_table[RW]
queue[RW]

Public Instance Methods

delete_from_table() click to toggle source
# File lib/belated/client.rb, line 61
def delete_from_table
  return if proc_table.length < 25

  @mutex.synchronize do
    proc_table.select { |_k, v| v.completed }.each do |key, _value|
      proc_table.delete(key)
    end
  end
end
initialize()
Alias for: start
old_perform(job, at: nil, max_retries: 5, active_job: false)
Alias for: perform
perform(job, at: nil, max_retries: 5, active_job: false) click to toggle source

The method that pushes the jobs to the queue. If there is no connection, it pushes the job to the bank. @param job [Object] - The the job to be pushed. @param at [Date] - The time at which the job should be executed. @param max_retries [Integer] - Times the job should be retried if it fails. @return [JobWrapper] - The job wrapper for the queue.

# File lib/belated/client.rb, line 77
def perform(job, at: nil, max_retries: 5, active_job: false)
  start unless started?
  return unless proper_job?(job)

  job_wrapper = wrap_job(job, at: at.to_f, max_retries: max_retries, active_job: active_job)
  bank.push(job_wrapper)
  @mutex.synchronize do
    proc_table[job_wrapper.object_id] = job_wrapper if job_wrapper.proc_klass
  end
  self.banker_thread = start_banker_thread if banker_thread.nil?
  job_wrapper
end
perform_belated(job, at: nil, max_retries: 5, active_job: false)
Alias for: perform
perform_later(job, at: nil, max_retries: 5, active_job: false)
Alias for: perform
start() click to toggle source

Starts up the client. Connects to the queue through DRb. @return [void]

# File lib/belated/client.rb, line 19
def start
  return if started?

  server_uri = Belated::URI
  DRb.start_service
  self.proc_table = {}
  self.bank = Thread::Queue.new
  self.queue = DRbObject.new_with_uri(server_uri)
  @started = true
  @mutex = Mutex.new
end
Also aliased as: initialize
start_banker_thread() click to toggle source

Thread in charge of handling the bank queue. You probably want to memoize the client in order to avoid having many threads in the sleep state. @return [void]

# File lib/belated/client.rb, line 46
def start_banker_thread
  Thread.new do
    loop do
      delete_from_table
      sleep Belated.client_heartbeat and next if bank.empty?

      bank.length.times do
        queue.push(wrapper = bank.pop)
      rescue DRb::DRbConnError
        bank.push(wrapper)
      end
    end
  end
end
started?() click to toggle source
# File lib/belated/client.rb, line 32
def started?
  @started
end
turn_off() click to toggle source

Makes it possible to reset the client

# File lib/belated/client.rb, line 37
def turn_off
  @started = false
  banker_thread&.kill
end

Private Instance Methods

drb_connected?() click to toggle source
# File lib/belated/client.rb, line 112
def drb_connected?
  queue.connected?
rescue StandardError
  false
end
proper_job?(job) click to toggle source
# File lib/belated/client.rb, line 94
def proper_job?(job)
  return true if job.respond_to?(:call) || job.respond_to?(:perform)

  warn 'job does not implement .call nor .perform!'
  false
end
wrap_job(job, at:, max_retries:, active_job:) click to toggle source
# File lib/belated/client.rb, line 101
def wrap_job(job, at:, max_retries:, active_job:)
  return job if job.is_a?(JobWrapper)

  wrapper = if active_job
              ActiveJob::QueueAdapters::BelatedAdapter::JobWrapper
            else
              JobWrapper
            end
  wrapper.new(job: job, at: at, max_retries: max_retries, active_job: active_job)
end