class Hutch::ConsumerMsg
Consumer Message wrap Hutch::Message and Consumer
Attributes
consumer[R]
message[R]
Public Class Methods
new(consumer, message)
click to toggle source
# File lib/hutch/patch/worker.rb, line 165 def initialize(consumer, message) @consumer = consumer @message = message end
Public Instance Methods
enqueue_in_or_not()
click to toggle source
if delays > 10s then let the message to rabbitmq to delay and enqueue again instead of rabbitmq reqneue
# File lib/hutch/patch/worker.rb, line 179 def enqueue_in_or_not # interval 小于 5s, 的则不会传, 在自己的 buffer 中等待 return false if interval < Hutch::Config.get(:worker_buffer_flush_interval) # 等待时间过长的消息, 交给远端的 rabbitmq 去进行等待, 不占用 buffer 空间 # 如果数据量特别大, 但 ratelimit 特别严格, 那么也会变为固定周期的积压, 需要增加对执行次数的记录以及延长 # 市场 30s 执行一次的任务, 积累了 200 个, 那么这个积压会越来越多, 直到保持到一个 RabbitMQ 与 hutch 之间的最长等待周期, 会一直空转 # - 要么增加对执行次数的考虑, 拉长延长. 但最终会有一个最长的延长 10800 (3h), 这个问题最终仍然会存在 # - 设置延长多长之后, 就舍弃这个任务, 因为由于 ratelimit 的存在, 但又持续的积压, 不可能处理完这个任务 # 这个方案没有很好的解决方法, 这是一个典型的 "生产速度大于消费速度" 的问题, 如果长时间的 生产 > 消费, 这个问题是无解的 Hutch.broker.ack(message.delivery_info.delivery_tag) # TODO: 如果存在 x-death 的 count 需要额外考虑, 解决与 error retry 的 x-death 复用的问题 # 临时给一个随机的 1,2 倍率的延迟, 大概率为 1 倍,小概率为 2 倍 consumer.enqueue_in(interval * [rand(3), 1].max, message.body, message.properties.to_hash) end
handle_cmsg_args()
click to toggle source
# File lib/hutch/patch/worker.rb, line 170 def handle_cmsg_args [consumer, message.delivery_info, message.properties, message.payload, message] end
interval()
click to toggle source
# File lib/hutch/patch/worker.rb, line 174 def interval @interval ||= consumer.interval(message) end
logger()
click to toggle source
# File lib/hutch/patch/worker.rb, line 161 def logger Hutch::Logging.logger end