class Qyu::Queue::Memory::Adapter
Constants
- TYPE
Public Class Methods
new(_config)
click to toggle source
# File lib/qyu/queue/memory/adapter.rb, line 10 def initialize(_config) @temp_store = Hash.new(false) @queues = {} @threads = [] end
valid_config?(_config)
click to toggle source
# File lib/qyu/queue/memory/adapter.rb, line 16 def self.valid_config?(_config) # TODO true end
Public Instance Methods
acknowledge_message(_queue_name, message_id)
click to toggle source
# File lib/qyu/queue/memory/adapter.rb, line 48 def acknowledge_message(_queue_name, message_id) @temp_store[message_id] = true end
enqueue_task(queue_name, task_id)
click to toggle source
# File lib/qyu/queue/memory/adapter.rb, line 21 def enqueue_task(queue_name, task_id) queue(queue_name) << { 'task_id' => task_id } end
enqueue_task_to_failed_queue(queue_name, task_id)
click to toggle source
# File lib/qyu/queue/memory/adapter.rb, line 25 def enqueue_task_to_failed_queue(queue_name, task_id) failed_queue_name = queue_name + '-failed' enqueue_task(failed_queue_name, task_id) end
fetch_next_message(queue_name)
click to toggle source
@param [String] queue_name @return [Hash] the acknowledge message
TODO Note the uglyness in `while … empty?`; it's because of reasons mainly for this (stackoverflow.com/q/11660253) reason.
# File lib/qyu/queue/memory/adapter.rb, line 37 def fetch_next_message(queue_name) sleep(1) while queue(queue_name).empty? message = queue(queue_name).pop(true) message_id = Qyu::Utils.uuid schedule_requeue(message, message_id, queue_name) { 'id' => message_id, 'task_id' => message['task_id'] } end
queues()
click to toggle source
# File lib/qyu/queue/memory/adapter.rb, line 52 def queues @queues.map do |name, queue| { name: name, messages: queue&.size } end end
size(queue_name)
click to toggle source
# File lib/qyu/queue/memory/adapter.rb, line 58 def size(queue_name) queue(queue_name).size end
Private Instance Methods
message_acknowledged?(message_id)
click to toggle source
# File lib/qyu/queue/memory/adapter.rb, line 71 def message_acknowledged?(message_id) @temp_store[message_id] == true end
queue(name)
click to toggle source
queue, or “get_or_create_queue”
@param [String] name The name of the queue to create if it does
does not exist and return;
# File lib/qyu/queue/memory/adapter.rb, line 79 def queue(name) if @queues[name] Qyu.logger.debug "Queue `#{name}`: #{@queues[name].length} elements" return @queues[name] end Qyu.logger.info "Could not find queue `#{name}`, creating it" @queues[name] ||= ::Queue.new end
Also aliased as: get_or_create_queue
schedule_requeue(message, message_id, queue_name)
click to toggle source
# File lib/qyu/queue/memory/adapter.rb, line 64 def schedule_requeue(message, message_id, queue_name) @threads << Thread.new(message_id, message) do |t_message_id, t_message| sleep(5) queue(queue_name) << t_message unless message_acknowledged?(t_message_id) end end