class Cuniculus::QueueConfig
Constants
- DEFAULT_MAX_RETRY
- DEFAULT_PREFETCH_COUNT
- DEFAULT_QUEUE_NAME
- DEFAULT_THREAD_POOL_SIZE
Attributes
durable[R]
max_retry[R]
name[R]
prefetch_count[R]
thread_pool_size[R]
Public Class Methods
new(opts = {})
click to toggle source
# File lib/cuniculus/queue_config.rb 16 def initialize(opts = {}) 17 opts = opts.transform_keys(&:to_s) 18 @durable = read_opt(opts["durable"], true) 19 @name = read_opt(opts["name"], DEFAULT_QUEUE_NAME) 20 @max_retry = read_opt(opts["max_retry"], DEFAULT_MAX_RETRY) 21 @prefetch_count = read_opt(opts["prefetch_count"], DEFAULT_PREFETCH_COUNT) 22 @thread_pool_size = read_opt(opts["thread_pool_size"], DEFAULT_THREAD_POOL_SIZE) 23 freeze 24 end
Public Instance Methods
declare!(channel)
click to toggle source
# File lib/cuniculus/queue_config.rb 30 def declare!(channel) 31 queue_name = name 32 base_q = channel.queue( 33 queue_name, 34 durable: durable, 35 exclusive: false, 36 arguments: { "x-dead-letter-exchange" => Cuniculus::CUNICULUS_DLX_EXCHANGE } 37 ) 38 base_q.bind(Cuniculus::CUNICULUS_EXCHANGE, { routing_key: name }) 39 40 retry_queue_names = (1..max_retry).map { |i| "#{name}_#{i}" } 41 max_retry.times do |i| 42 queue_name = retry_queue_names[i] 43 44 q = channel.queue( 45 queue_name, 46 durable: durable, 47 exclusive: false, 48 arguments: { 49 "x-dead-letter-exchange" => Cuniculus::CUNICULUS_EXCHANGE, 50 "x-dead-letter-routing-key" => name, 51 "x-message-ttl" => ((i**4) + (15 * (i + 1))) * 1000 52 } 53 ) 54 q.bind(Cuniculus::CUNICULUS_EXCHANGE, { routing_key: queue_name }) 55 end 56 57 Cuniculus::JobQueue.new(base_q, retry_queue_names) 58 rescue Bunny::PreconditionFailed => e 59 raise Cuniculus.convert_exception_class(e, Cuniculus::RMQQueueConfigurationConflict), "Declaration failed for queue '#{queue_name}'" 60 end
read_opt(val, default)
click to toggle source
# File lib/cuniculus/queue_config.rb 26 def read_opt(val, default) 27 val.nil? ? default : val 28 end