class Beez::Processor

Attributes

busy_count[R]
client[R]
timer[R]
worker_class[R]

Public Class Methods

new(client: ::Beez.client, worker_class:) click to toggle source
# File lib/beez/processor.rb, line 6
def initialize(client: ::Beez.client, worker_class:)
  @client = client
  @worker_class = worker_class
  @busy_count = ::Concurrent::AtomicFixnum.new(0)
  @timer = ::Concurrent::TimerTask.new(
    run_now: true,
    execution_interval: worker_poll_interval,
    timeout_interval: worker_timeout
  ) { run }
end

Public Instance Methods

should_activate_jobs?() click to toggle source
# File lib/beez/processor.rb, line 27
def should_activate_jobs?
  busy_count.value <= worker_max_jobs_to_activate
end
start() click to toggle source
# File lib/beez/processor.rb, line 17
def start
  timer.execute
  self
end
stop() click to toggle source
# File lib/beez/processor.rb, line 22
def stop
  timer.shutdown
  self
end

Private Instance Methods

activate_jobs_request() click to toggle source
# File lib/beez/processor.rb, line 61
def activate_jobs_request
  client.activate_jobs(
    type: worker_type,
    worker: worker_name,
    timeout: worker_timeout * 1000,
    maxJobsToActivate: max_jobs_to_activate,
    fetchVariable: worker_variables_to_fetch,
  )
end
fetch() click to toggle source
# File lib/beez/processor.rb, line 37
def fetch
  activate_jobs_request.each do |response|
    busy_count.increment(response.jobs.count)
    response.jobs.each do |job|
      ::Concurrent::Future.execute do
        process(job)
      end
    end
  end
end
logger() click to toggle source
# File lib/beez/processor.rb, line 99
def logger
  ::Beez.logger
end
max_jobs_to_activate() click to toggle source
# File lib/beez/processor.rb, line 95
def max_jobs_to_activate
  worker_max_jobs_to_activate - busy_count.value
end
process(job) click to toggle source
# File lib/beez/processor.rb, line 48
def process(job)
  worker = worker_class.new(client)
  begin
    worker.process(job)
    worker.complete_job(job)
  rescue => exception
    worker.fail_job(job, reason: exception.message)
    raise exception
  ensure
    busy_count.decrement
  end
end
run() click to toggle source
# File lib/beez/processor.rb, line 33
def run
  fetch if should_activate_jobs?
end
worker_max_jobs_to_activate() click to toggle source
# File lib/beez/processor.rb, line 79
def worker_max_jobs_to_activate
  worker_class.get_max_jobs_to_activate
end
worker_name() click to toggle source
# File lib/beez/processor.rb, line 75
def worker_name
  worker_class.get_name
end
worker_poll_interval() click to toggle source
# File lib/beez/processor.rb, line 91
def worker_poll_interval
  worker_class.get_poll_interval
end
worker_timeout() click to toggle source
# File lib/beez/processor.rb, line 83
def worker_timeout
  worker_class.get_timeout
end
worker_type() click to toggle source
# File lib/beez/processor.rb, line 71
def worker_type
  worker_class.get_type
end
worker_variables_to_fetch() click to toggle source
# File lib/beez/processor.rb, line 87
def worker_variables_to_fetch
  worker_class.get_variables_to_fetch
end