module Hutch::Enqueue
If consumer need `enqueue`, just include this module
Public Instance Methods
attempts(times)
click to toggle source
# File lib/hutch/enqueue.rb, line 76 def attempts(times) @max_retries = [times, 0].max end
delay_seconds_level(delay_seconds)
click to toggle source
计算 delay 的 level 5s 10s 20s 30s 60s 120s 180s 240s 300s 360s 420s 480s 540s 600s 1200s 1800s 2400s 3600s 7200s 10800s
# File lib/hutch/enqueue.rb, line 88 def delay_seconds_level(delay_seconds) case delay_seconds when 0..5 # 5s 5 when 5..10 # 10s 10 when 10..20 # 20s 20 when 20..30 # 30s 30 when 30..60 # 60s 60 when 60..120 # 120s 120 when 120..180 # 180s 180 when 180..240 # 240s 240 when 240..300 # 300s 300 when 300..360 # 360s 360 when 360..420 # 420s 420 when 420..480 # 480s 480 when 480..540 # 540s 540 when 540..600 # 600s 600 when 600..1200 # 1200s 1200 when 1200..1800 # 1800s 1800 when 1800..2400 # 2400s 2400 when 2400..3000 # 3000s 3000 when 3000..3600 # 3600s 3600 when 3600..7200 # 7200s 7200 when 7200..10800 # 10800s 10800 else 10800 end end
enqueue(msg = {})
click to toggle source
Publish the message to this consumer with one routing_key
# File lib/hutch/enqueue.rb, line 16 def enqueue(msg = {}) Hutch.publish(enqueue_routing_key, msg) end
enqueue_at(time, message = {}, props = {})
click to toggle source
delay at exatly time point
# File lib/hutch/enqueue.rb, line 50 def enqueue_at(time, message = {}, props = {}) # compatible with with ActiveJob API time_or_timestamp = time.respond_to?(:utc) ? time.utc.to_f : time # if time is early then now then just delay 1 second interval = [(time_or_timestamp - Time.now.utc.to_f), 1.second].max enqueue_in(interval, message, props) end
enqueue_in(interval, message = {}, props = {})
click to toggle source
publish message at a delay times interval: delay interval seconds message: publish message
# File lib/hutch/enqueue.rb, line 29 def enqueue_in(interval, message = {}, props = {}) # TODO: 超过 3h 的延迟也会接收, 但是不会延迟那么长时间, 但给予 warn delay_seconds = delay_seconds_level(interval) # 设置固定的延迟, 利用 headers 中的 CC, 以及区分的 topic, 将消息重新投递进入队列 prop_headers = props[:headers] || {} properties = props.merge( expiration: (delay_seconds * 1000).to_i, headers: prop_headers.merge(CC: [enqueue_routing_key]) ) delay_routing_key = Hutch::Schedule.delay_routing_key("#{delay_seconds}s") Hutch::Schedule.publish(delay_routing_key, message, properties) end
enqueue_routing_key()
click to toggle source
routing_key: the purpose is to send message to hutch exchange and then routing to the correct queue, so can use any of them routing_key that the consumer is consuming.
# File lib/hutch/enqueue.rb, line 71 def enqueue_routing_key raise "Routing Keys is not set!" if routing_keys.size < 1 routing_keys.to_a.last end
enqueue_uniq(uniq_key, msg = {})
click to toggle source
enqueue unique message
# File lib/hutch/enqueue.rb, line 21 def enqueue_uniq(uniq_key, msg = {}) return false unless uniq_key_check(uniq_key) enqueue(msg) end
enqueue_uniq_at(uniq_key, time, message = {}, props = {})
click to toggle source
# File lib/hutch/enqueue.rb, line 58 def enqueue_uniq_at(uniq_key, time, message = {}, props = {}) return false unless uniq_key_check(uniq_key) enqueue_at(time, message, props) end
enqueue_uniq_in(uniq_key, interval, message = {}, props = {})
click to toggle source
# File lib/hutch/enqueue.rb, line 44 def enqueue_uniq_in(uniq_key, interval, message = {}, props = {}) return false unless uniq_key_check(uniq_key) enqueue_in(interval, message, props) end
max_attempts()
click to toggle source
# File lib/hutch/enqueue.rb, line 80 def max_attempts @max_retries || 0 end
uniq_key_check(uniq_key)
click to toggle source
check uniq_key is set or not expire time set for 24h
# File lib/hutch/enqueue.rb, line 65 def uniq_key_check(uniq_key) Hutch::Schedule.ns.set(uniq_key, "1", ex: 86400, nx: true) end