class ActiveJob::GoogleCloudPubsub::Worker
Constants
- MAX_DEADLINE
Public Class Methods
new(queue: 'default', pubsub: Google::Cloud::Pubsub.new(timeout: 60), logger: Logger.new($stdout))
click to toggle source
# File lib/activejob_google_cloud_pubsub/worker.rb, line 15 def initialize(queue: 'default', pubsub: Google::Cloud::Pubsub.new(timeout: 60), logger: Logger.new($stdout)) @queue_name = queue @pubsub = pubsub @logger = logger end
Public Instance Methods
ensure_subscription()
click to toggle source
# File lib/activejob_google_cloud_pubsub/worker.rb, line 36 def ensure_subscription @pubsub.subscription_for @queue_name nil end
run()
click to toggle source
# File lib/activejob_google_cloud_pubsub/worker.rb, line 21 def run subscriber = @pubsub.subscription_for(@queue_name).listen(streams: 1, threads: { callback: 1 }) do |message| @logger&.info "Message(#{message.message_id}) was received." process message end subscriber.on_error do |error| @logger&.error(error) end subscriber.start sleep end
Private Instance Methods
process(message)
click to toggle source
# File lib/activejob_google_cloud_pubsub/worker.rb, line 44 def process(message) timer_opts = { execution_interval: MAX_DEADLINE - 10.seconds, timeout_interval: 5.seconds, run_now: true } delay_timer = Concurrent::TimerTask.execute(timer_opts) do message.modify_ack_deadline! MAX_DEADLINE.to_i end begin succeeded = false failed = false ActiveJob::Base.execute JSON.parse(message.data) succeeded = true rescue StandardError failed = true raise ensure delay_timer.shutdown if succeeded || failed message.acknowledge! @logger&.info "Message(#{message.message_id}) was acknowledged." else # terminated from outside message.modify_ack_deadline! 0 end end end