class RailsAutoscaleAgent::WorkerAdapters::DelayedJob

Attributes

queues[W]

Public Instance Methods

collect!(store) click to toggle source
# File lib/rails_autoscale_agent/worker_adapters/delayed_job.rb, line 22
      def collect!(store)
        log_msg = String.new
        t = Time.now.utc
        sql = <<~SQL
          SELECT COALESCE(queue, 'default'), min(run_at)
          FROM delayed_jobs
          WHERE locked_at IS NULL
          AND failed_at IS NULL
          GROUP BY queue
        SQL

        run_at_by_queue = Hash[select_rows(sql)]

        # Don't collect worker metrics if there are unreasonable number of queues
        if run_at_by_queue.size > 50
          logger.debug "Skipping DelayedJob metrics - #{run_at_by_queue.size} queues"
          return
        end

        self.queues = queues | run_at_by_queue.keys

        if track_long_running_jobs?
          sql = <<~SQL
            SELECT COALESCE(queue, 'default'), count(*)
            FROM delayed_jobs
            WHERE locked_at IS NOT NULL
            AND locked_by IS NOT NULL
            AND failed_at IS NULL
            GROUP BY 1
          SQL

          busy_count_by_queue = Hash[select_rows(sql)]
          self.queues = queues | busy_count_by_queue.keys
        end

        queues.each do |queue|
          run_at = run_at_by_queue[queue]
          # DateTime.parse assumes a UTC string
          run_at = DateTime.parse(run_at) if run_at.is_a?(String)
          latency_ms = run_at ? ((t - run_at)*1000).ceil : 0
          latency_ms = 0 if latency_ms < 0

          store.push latency_ms, t, queue
          log_msg << "dj-qt.#{queue}=#{latency_ms} "

          if track_long_running_jobs?
            busy_count = busy_count_by_queue[queue] || 0
            store.push busy_count, Time.now, queue, :busy
            log_msg << "dj-busy.#{queue}=#{busy_count} "
          end
        end

        logger.debug log_msg unless log_msg.empty?
      end
enabled?() click to toggle source
# File lib/rails_autoscale_agent/worker_adapters/delayed_job.rb, line 13
def enabled?
  if defined?(::Delayed::Job) && defined?(::Delayed::Backend::ActiveRecord)
    log_msg = String.new("DelayedJob enabled (#{::ActiveRecord::Base.default_timezone})")
    log_msg << " with long-running job support" if track_long_running_jobs?
    logger.info log_msg
    true
  end
end

Private Instance Methods

queues() click to toggle source
# File lib/rails_autoscale_agent/worker_adapters/delayed_job.rb, line 79
def queues
  # Track the known queues so we can continue reporting on queues that don't
  # have enqueued jobs at the time of reporting.
  # Assume a "default" queue so we always report *something*, even when nothing
  # is enqueued.
  @queues ||= Set.new(['default'])
end
select_rows(sql) click to toggle source
# File lib/rails_autoscale_agent/worker_adapters/delayed_job.rb, line 91
def select_rows(sql)
  # This ensures the agent doesn't hold onto a DB connection any longer than necessary
  ActiveRecord::Base.connection_pool.with_connection { |c| c.select_rows(sql) }
end
track_long_running_jobs?() click to toggle source
# File lib/rails_autoscale_agent/worker_adapters/delayed_job.rb, line 87
def track_long_running_jobs?
  Config.instance.track_long_running_jobs
end