class Lowkiq::Queue::QueueMetrics
Public Class Methods
new(redis_pool)
click to toggle source
# File lib/lowkiq/queue/queue_metrics.rb, line 4 def initialize(redis_pool) @redis_pool = redis_pool @timestamp = Utils::Timestamp.method(:now) end
Public Instance Methods
call(queues)
click to toggle source
# File lib/lowkiq/queue/queue_metrics.rb, line 9 def call(queues) result = @redis_pool.with do |redis| #redis.pipelined do redis.multi do queues.each { |queue| pipeline redis, queue } end end result.each_slice(pipeline_count).map do |res| coerce(res) end end
Private Instance Methods
coerce(result)
click to toggle source
# File lib/lowkiq/queue/queue_metrics.rb, line 53 def coerce(result) length = result[0] + result[1] OpenStruct.new length: length, fresh: result[0], retries: result[1], morgue_length: result[2], lag: coerce_lag(result[3]), processed: result[4].to_i, failed: result[5].to_i, busy: coerce_busy(result[6]) end
coerce_busy(res)
click to toggle source
# File lib/lowkiq/queue/queue_metrics.rb, line 73 def coerce_busy(res) res.map(&:to_i).reduce(0, &:+) end
coerce_lag(res)
click to toggle source
# File lib/lowkiq/queue/queue_metrics.rb, line 65 def coerce_lag(res) _id, score = res.first return 0.0 if score.nil? lag = @timestamp.call - score return 0.0 if lag < 0.0 lag end
pipeline(redis, name)
click to toggle source
# File lib/lowkiq/queue/queue_metrics.rb, line 24 def pipeline(redis, name) keys = Keys.new name # fresh redis.zcount keys.all_ids_scored_by_retry_count_zset, -1, -1 # retries redis.zcount keys.all_ids_scored_by_retry_count_zset, 0, '+inf' # morgue_length redis.zcard keys.morgue_all_ids_scored_by_updated_at_zset # lag [id, score] redis.zrange keys.all_ids_scored_by_perform_in_zset, 0, 0, with_scores: true # processed redis.get keys.processed_key # failed redis.get keys.failed_key # busy [] redis.hvals keys.processing_length_by_shard_hash end
pipeline_count()
click to toggle source
# File lib/lowkiq/queue/queue_metrics.rb, line 49 def pipeline_count 7 end