class Lowkiq::Queue::Queue
Attributes
name[R]
pool[R]
Public Class Methods
new(redis_pool, name, shards_count)
click to toggle source
# File lib/lowkiq/queue/queue.rb, line 6 def initialize(redis_pool, name, shards_count) @pool = redis_pool @name = name @shards_count = shards_count @timestamp = Utils::Timestamp.method(:now) @keys = Keys.new name @fetch = Fetch.new name end
Public Instance Methods
ack(shard, data, result = nil)
click to toggle source
# File lib/lowkiq/queue/queue.rb, line 103 def ack(shard, data, result = nil) ids = data.map { |job| job[:id] } length = ids.length @pool.with do |redis| redis.multi do redis.del @keys.processing_ids_with_perform_in_hash(shard) redis.del @keys.processing_ids_with_retry_count_hash(shard) redis.del @keys.processing_errors_hash(shard) ids.each do |id| redis.del @keys.processing_payloads_zset(id) end redis.hdel @keys.processing_length_by_shard_hash, shard redis.incrby @keys.processed_key, length if result == :success redis.incrby @keys.failed_key, length if result == :fail end end end
delete(ids)
click to toggle source
# File lib/lowkiq/queue/queue.rb, line 170 def delete(ids) @pool.with do |redis| redis.multi do ids.each do |id| shard = id_to_shard id redis.zrem @keys.all_ids_lex_zset, id redis.zrem @keys.all_ids_scored_by_perform_in_zset, id redis.zrem @keys.all_ids_scored_by_retry_count_zset, id redis.zrem @keys.ids_scored_by_perform_in_zset(shard), id redis.del @keys.payloads_zset(id) redis.hdel @keys.errors_hash, id end end end end
morgue_delete(ids)
click to toggle source
# File lib/lowkiq/queue/queue.rb, line 157 def morgue_delete(ids) @pool.with do |redis| redis.multi do ids.each do |id| redis.zrem @keys.morgue_all_ids_lex_zset, id redis.zrem @keys.morgue_all_ids_scored_by_updated_at_zset, id redis.del @keys.morgue_payloads_zset(id) redis.hdel @keys.morgue_errors_hash, id end end end end
pop(shard, limit:)
click to toggle source
# File lib/lowkiq/queue/queue.rb, line 38 def pop(shard, limit:) @pool.with do |redis| ids = redis.zrangebyscore @keys.ids_scored_by_perform_in_zset(shard), 0, @timestamp.call, limit: [0, limit] return [] if ids.empty? res = redis.multi do |redis| redis.hset @keys.processing_length_by_shard_hash, shard, ids.length ids.each do |id| redis.zrem @keys.all_ids_lex_zset, id redis.zrem @keys.ids_scored_by_perform_in_zset(shard), id Script.zremhset redis, @keys.all_ids_scored_by_perform_in_zset, @keys.processing_ids_with_perform_in_hash(shard), id Script.zremhset redis, @keys.all_ids_scored_by_retry_count_zset, @keys.processing_ids_with_retry_count_hash(shard), id redis.rename @keys.payloads_zset(id), @keys.processing_payloads_zset(id) Script.hmove redis, @keys.errors_hash, @keys.processing_errors_hash(shard), id end processing_data_pipeline(redis, shard, ids) end res.shift 1 + ids.length * 6 processing_data_build res, ids end end
processing_data(shard)
click to toggle source
# File lib/lowkiq/queue/queue.rb, line 122 def processing_data(shard) @pool.with do |redis| ids = redis.hkeys @keys.processing_ids_with_perform_in_hash(shard) return [] if ids.empty? res = redis.multi do |redis| processing_data_pipeline redis, shard, ids end processing_data_build res, ids end end
push(batch)
click to toggle source
# File lib/lowkiq/queue/queue.rb, line 15 def push(batch) @pool.with do |redis| redis.multi do batch.each do |job| id = job.fetch(:id) perform_in = job.fetch(:perform_in, @timestamp.call) retry_count = job.fetch(:retry_count, -1) # for testing payload = job.fetch(:payload, "") score = job.fetch(:score, @timestamp.call) shard = id_to_shard id redis.zadd @keys.all_ids_lex_zset, 0, id redis.zadd @keys.all_ids_scored_by_perform_in_zset, perform_in, id, nx: true redis.zadd @keys.all_ids_scored_by_retry_count_zset, retry_count, id, nx: true redis.zadd @keys.ids_scored_by_perform_in_zset(shard), perform_in, id, nx: true redis.zadd @keys.payloads_zset(id), score, Lowkiq.dump_payload.call(payload), nx: true end end end end
push_back(batch)
click to toggle source
# File lib/lowkiq/queue/queue.rb, line 75 def push_back(batch) @pool.with do |redis| timestamp = @timestamp.call redis.multi do |redis| batch.each do |job| id = job.fetch(:id) perform_in = job.fetch(:perform_in, timestamp) retry_count = job.fetch(:retry_count, -1) payloads = job.fetch(:payloads).map do |(payload, score)| [score, Lowkiq.dump_payload.call(payload)] end error = job.fetch(:error, nil) shard = id_to_shard id redis.zadd @keys.all_ids_lex_zset, 0, id redis.zadd @keys.all_ids_scored_by_perform_in_zset, perform_in, id redis.zadd @keys.all_ids_scored_by_retry_count_zset, retry_count, id redis.zadd @keys.ids_scored_by_perform_in_zset(shard), perform_in, id redis.zadd @keys.payloads_zset(id), payloads, nx: true redis.hset @keys.errors_hash, id, error unless error.nil? end end end end
push_to_morgue(batch)
click to toggle source
# File lib/lowkiq/queue/queue.rb, line 135 def push_to_morgue(batch) @pool.with do |redis| timestamp = @timestamp.call redis.multi do batch.each do |job| id = job.fetch(:id) payloads = job.fetch(:payloads).map do |(payload, score)| [score, Lowkiq.dump_payload.call(payload)] end error = job.fetch(:error, nil) redis.zadd @keys.morgue_all_ids_lex_zset, 0, id redis.zadd @keys.morgue_all_ids_scored_by_updated_at_zset, timestamp, id redis.zadd @keys.morgue_payloads_zset(id), payloads, nx: true redis.hset @keys.morgue_errors_hash, id, error unless error.nil? end end end end
shards()
click to toggle source
# File lib/lowkiq/queue/queue.rb, line 186 def shards (0...@shards_count) end
Private Instance Methods
id_to_shard(id)
click to toggle source
# File lib/lowkiq/queue/queue.rb, line 192 def id_to_shard(id) Zlib.crc32(id.to_s) % @shards_count end
processing_data_build(arr, ids)
click to toggle source
# File lib/lowkiq/queue/queue.rb, line 206 def processing_data_build(arr, ids) ids_with_perform_in = arr.shift ids_with_retry_count = arr.shift errors = arr.shift payloads = arr ids.zip(payloads).map do |(id, payloads)| next if payloads.empty? { id: id, perform_in: ids_with_perform_in[id].to_f, retry_count: ids_with_retry_count[id].to_f, payloads: payloads.map { |(payload, score)| [Lowkiq.load_payload.call(payload), score] }, error: errors[id] }.compact end.compact end
processing_data_pipeline(redis, shard, ids)
click to toggle source
# File lib/lowkiq/queue/queue.rb, line 196 def processing_data_pipeline(redis, shard, ids) redis.hgetall @keys.processing_ids_with_perform_in_hash(shard) redis.hgetall @keys.processing_ids_with_retry_count_hash(shard) redis.hgetall @keys.processing_errors_hash(shard) ids.each do |id| redis.zrange @keys.processing_payloads_zset(id), 0, -1, with_scores: true end end