class Gps::Job::Worker

Public Class Methods

perform_job(data) click to toggle source
# File lib/gps/job/worker.rb, line 34
def self.perform_job(data)
  Gps::Job.configuration.logger&.info "Started Job: #{data}"
  ActiveSupport::Notifications.instrument(Gps::Job.configuration.event_name, data) do
    data.fetch('job_class').constantize.perform_now(*data.fetch('arguments'))
  end
end
perform_now?(data) click to toggle source
# File lib/gps/job/worker.rb, line 30
def self.perform_now?(data)
  data['at'].to_i < Time.now.to_i
end
run_worker!() click to toggle source
# File lib/gps/job/worker.rb, line 6
def self.run_worker!
  Gps::Job.configuration.logger&.info 'Google Pub/Sub Worker running.'
  subscribe_to_active_support_instrumentation!
  subscribe_to_google_pub_sub!
end
subscribe_to_active_support_instrumentation!() click to toggle source
# File lib/gps/job/worker.rb, line 12
def self.subscribe_to_active_support_instrumentation!
  ActiveSupport::Notifications.subscribe(Gps::Job.configuration.event_name) do |_name, started, finished, _unique_id, data|
    Gps::Job.configuration.logger&.info "Finished job: #{data}"
    Gps::Job.metrics.total_duration += (finished - started)
    Gps::Job.metrics.total_count += 1
  end
end
subscribe_to_google_pub_sub!() click to toggle source
# File lib/gps/job/worker.rb, line 20
def self.subscribe_to_google_pub_sub!
  subscription.listen do |message|
    data = JSON.parse(message.data)
    if perform_now?(data)
      message.acknowledge!
      perform_job(data)
    end
  end
end
subscription() click to toggle source
# File lib/gps/job/worker.rb, line 41
def self.subscription
  Gps::Job.topic.subscription(Gps::Job.configuration.queue) ||
    Gps::Job.topic.subscribe(Gps::Job.configuration.queue)
end