class ActiveJob::GoogleCloudPubsub::Worker

Constants

MAX_DEADLINE

Public Class Methods

new(queue: 'default', min_threads: 0, max_threads: Concurrent.processor_count, pubsub: Google::Cloud::Pubsub.new, logger: Logger.new($stdout)) click to toggle source
# File lib/activejob-google_cloud_pubsub/worker.rb, line 16
def initialize(queue: 'default', min_threads: 0, max_threads: Concurrent.processor_count, pubsub: Google::Cloud::Pubsub.new, logger: Logger.new($stdout))
  @queue_name  = queue
  @min_threads = min_threads
  @max_threads = max_threads
  @pubsub      = pubsub
  @logger      = logger
end

Public Instance Methods

ensure_subscription() click to toggle source
# File lib/activejob-google_cloud_pubsub/worker.rb, line 50
def ensure_subscription
  @pubsub.subscription_for @queue_name

  nil
end
run() click to toggle source
# File lib/activejob-google_cloud_pubsub/worker.rb, line 24
def run
  pool = Concurrent::ThreadPoolExecutor.new(min_threads: @min_threads, max_threads: @max_threads, max_queue: -1)

  @pubsub.subscription_for(@queue_name).listen {|message|
    @logger&.info "Message(#{message.message_id}) was received."

    begin
      Concurrent::Promise.execute(args: message, executor: pool) {|msg|
        process_or_delay msg
      }.rescue {|e|
        @logger&.error e
      }
    rescue Concurrent::RejectedExecutionError
      Concurrent::Promise.execute(args: message) {|msg|
        msg.delay! 10.seconds.to_i

        @logger&.info "Message(#{msg.message_id}) was rescheduled after 10 seconds because the thread pool is full."
      }.rescue {|e|
        @logger&.error e
      }
    end
  }.start

  sleep
end

Private Instance Methods

process(message) click to toggle source
# File lib/activejob-google_cloud_pubsub/worker.rb, line 70
def process(message)
  timer_opts = {
    execution_interval: MAX_DEADLINE - 10.seconds,
    timeout_interval:   5.seconds,
    run_now:            true
  }

  delay_timer = Concurrent::TimerTask.execute(timer_opts) {
    message.delay! MAX_DEADLINE.to_i
  }

  begin
    succeeded = false
    failed    = false

    ActiveJob::Base.execute JSON.parse(message.data)

    succeeded = true
  rescue Exception
    failed = true

    raise
  ensure
    delay_timer.shutdown

    if succeeded || failed
      message.acknowledge!

      @logger&.info "Message(#{message.message_id}) was acknowledged."
    else
      # terminated from outside
      message.delay! 0
    end
  end
end
process_or_delay(message) click to toggle source
# File lib/activejob-google_cloud_pubsub/worker.rb, line 58
def process_or_delay(message)
  if message.time_to_process?
    process message
  else
    deadline = [message.remaining_time_to_schedule, MAX_DEADLINE.to_i].min

    message.delay! deadline

    @logger&.info "Message(#{message.message_id}) was scheduled at #{message.scheduled_at} so it was rescheduled after #{deadline} seconds."
  end
end