class Hutch::Worker
Monkey patch worker. 因为 hutch 是借用的底层的 bunny 的 ConsumerWorkPool 来完成的并发任务处理, 但这个 Pool 太过于通用, 而我们需要针对 rabbitmq 传入过来的 Message 进行处理, 需要在任务执行的过程中 添加额外的处理信息, 所以我们不由 ConsumerWorkPool 来处理任务, 改为由 ConsumerWorkPool 执行一次任务提交, 由 Bunny::ConsumerWorkPool 将需要执行的 block 提交给自己的 WorkerPool 来进行最终执行. 因为 RabbitMQ 队列中的任务是需要手动 Ack 才会标记完成, 并且根据 Channel 会有 Prefetch, 所以结合这两个特性 则可以利用本地进程中的 Queue 进行缓存任务, 只要没有执行会有 Prefetch 控制当前节点缓存的总任务数, 根据 Ack 会 明确告知 RabbitMQ 此任务完成.
Public Class Methods
# File lib/hutch/patch/worker.rb, line 12 def initialize(broker, consumers, setup_procs) raise "use Hutch::Schedule must set an positive channel_prefetch" if Hutch::Config.get(:channel_prefetch) < 1 @broker = broker self.consumers = consumers self.setup_procs = setup_procs @message_worker = Concurrent::FixedThreadPool.new(Hutch::Config.get(:worker_pool_size)) @timer_worker = Concurrent::TimerTask.execute(execution_interval: Hutch::Config.get(:poller_interval)) do # all chekcer in the same thread heartbeat_connection flush_to_retry retry_buffer_queue end # The queue size maybe the same as channel[prefetch] and every Consumer shared one buffer queue with the # same prefetch size, when current consumer have unack messages reach the prefetch size rabbitmq will stop push # message to this consumer. # Because the buffer queue is shared by all consumers so the max queue size is [prefetch * consumer count], # if prefetch is 20 and have 30 consumer the max queue size is 20 * 30 = 600. @buffer_queue = ::Queue.new @batch_size = Hutch::Config.get(:poller_batch_size) @connected = Hutch.connected? @last_flush_time = Time.now.utc end
Public Instance Methods
change args to message reuse the code from handle_message
# File lib/hutch/patch/worker.rb, line 80 def consumer_msg(consumer, delivery_info, properties, payload) serializer = consumer.get_serializer || Hutch::Config[:serializer] logger.debug { spec = serializer.binary? ? "#{payload.bytesize} bytes" : "#{payload}" "message(#{properties.message_id || '-'}): " + "routing key: #{delivery_info.routing_key}, " + "consumer: #{consumer}, " + "payload: #{spec}" } ConsumerMsg.new(consumer, Hutch::Message.new(delivery_info, properties, payload, serializer)) end
对于 rate 间隔比较长的, 不适合一直存储在 buffer 中, 所以需要根据 interval 的值将长周期的 message 重新入队给 RabbitMQ 让其进行 等待, 但同时不可以让其直接 Requeue, 这样会导致频繁的与 RabbitMQ 来往交换. 需要让消息根据周期以及执行次数逐步拉长等待, 直到最终最长 时间的等待.
有下面几个要求:
- 在 retry_buffer_queue 之前调用 - 整个方法调用时间长度需要在 1s 之内
# File lib/hutch/patch/worker.rb, line 135 def flush_to_retry now = Time.now.utc if now - @last_flush_time >= Hutch::Config.get(:worker_buffer_flush_interval) @buffer_queue.size.times do cmsg = peak break if cmsg.blank? # 如果没有被处理, 重新放回 buffer @buffer_queue.push(cmsg) unless cmsg.enqueue_in_or_not end @last_flush_time = now logger.debug "flush_to_retry #{Time.now.utc - now}" end end
# File lib/hutch/patch/worker.rb, line 93 def handle_cmsg(consumer, delivery_info, properties, payload, message) consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info } with_tracing(consumer_instance).handle(message) @broker.ack(delivery_info.delivery_tag) rescue => ex acknowledge_error(delivery_info, properties, @broker, ex) handle_error(properties, payload, consumer, ex) end
cmsg: ConsumerMsg
# File lib/hutch/patch/worker.rb, line 60 def handle_cmsg_with_limits(cmsg) # 正常的任务处理 ratelimit 的处理逻辑, 如果有限制那么就进入 buffer 缓冲 consumer = cmsg.consumer @message_worker.post do if consumer.ratelimit_exceeded?(cmsg.message) @buffer_queue.push(cmsg) else # if Hutch disconnect skip do work let message timeout in rabbitmq waiting message push again return unless @connected consumer.ratelimit_add(cmsg.message) handle_cmsg(*cmsg.handle_cmsg_args) end end end
# File lib/hutch/patch/worker.rb, line 75 def handle_message_with_limits(consumer, delivery_info, properties, payload) handle_cmsg_with_limits(consumer_msg(consumer, delivery_info, properties, payload)) end
心跳检查 Hutch
的连接
# File lib/hutch/patch/worker.rb, line 104 def heartbeat_connection @connected = Hutch.connected? end
non-blocking pop message, if empty return nil. other error raise exception
# File lib/hutch/patch/worker.rb, line 150 def peak @buffer_queue.pop(true) rescue ThreadError => e nil if e.to_s == "queue empty" end
每隔一段时间, 从 buffer queue 中转移任务到执行, interval 比较短的会立即执行掉
# File lib/hutch/patch/worker.rb, line 109 def retry_buffer_queue begin_size = @buffer_queue.size now = Time.now.utc stat = {} @batch_size.times do cmsg = peak break if cmsg.blank? handle_cmsg_with_limits(cmsg) next unless logger.level == Logger::DEBUG if stat.key?(cmsg.message.body[:b]) stat[cmsg.message.body[:b]] += 1 else stat[cmsg.message.body[:b]] = 1 end end logger.debug "retry_buffer_queue #{Time.now.utc - now}, size from #{begin_size} to #{@buffer_queue.size}, stat: #{stat}" end
Bind a consumer's routing keys to its queue, and set up a subscription to receive messages sent to the queue.
# File lib/hutch/patch/worker.rb, line 47 def setup_queue(consumer) logger.info "setting up queue: #{consumer.get_queue_name}" queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments) @broker.bind_queue(queue, consumer.routing_keys) queue.subscribe(consumer_tag: unique_consumer_tag, manual_ack: true) do |*args| delivery_info, properties, payload = Hutch::Adapter.decode_message(*args) handle_message_with_limits(consumer, delivery_info, properties, payload) end end
Stop a running worker by killing all subscriber threads. Stop two thread pool
# File lib/hutch/patch/worker.rb, line 39 def stop @timer_worker.shutdown @broker.stop @message_worker.shutdown end