class PulsarSdk::Consumer::MessageTracker
Public Class Methods
new(redelivery_delay)
click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 8 def initialize(redelivery_delay) @redelivery_delay = redelivery_delay @received_message = ReceivedQueue.new @acknowledge_message = NackQueue.new {|parent, child| child[:ack_at] <=> parent[:ack_at] } @consumers = {} @tracker = track end
Public Instance Methods
add_consumer(consumer)
click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 17 def add_consumer(consumer) @consumers[consumer.consumer_id] = consumer end
close()
click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 47 def close @received_message.close end
receive(*args)
click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 21 def receive(*args) @received_message.add(*args) end
shift(timeout)
click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 25 def shift(timeout) cmd, meta_and_payload = @received_message.pop(timeout) return if cmd.nil? message = PulsarSdk::Protocol::Structure.new(meta_and_payload).decode consumer_id = cmd.message&.consumer_id real_consumer = @consumers[consumer_id] message.assign_attributes( message_id: cmd.message&.message_id, consumer_id: consumer_id, topic: real_consumer&.topic, ack_handler: ack_handler ) real_consumer&.increase_fetched [cmd, message] end
Private Instance Methods
ack_handler()
click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 71 def ack_handler Proc.new do |cmd| current_clock = Process.clock_gettime(Process::CLOCK_MONOTONIC) ack_at = cmd.typeof_ack? ? (current_clock - 1) : (current_clock + @redelivery_delay.to_i) @acknowledge_message.insert(cmd: cmd, ack_at: ack_at) end end
execute_async(cmd)
click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 79 def execute_async(cmd) consumer = @consumers[cmd.get_consumer_id] consumer.execute_async(cmd) end
track()
click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 52 def track Thread.new do loop do while item = @acknowledge_message.top break if item[:ack_at] > Process.clock_gettime(Process::CLOCK_MONOTONIC) begin PulsarSdk.logger.debug('acknowledge message'){"#{Process.clock_gettime(Process::CLOCK_MONOTONIC)}: #{item[:cmd].type} --> #{item[:ack_at]}"} execute_async(item[:cmd]) @acknowledge_message.shift rescue => exp PulsarSdk.logger.error('Error occur when acknowledge message'){exp} PulsarSdk.logger.error('Error occur when acknowledge message'){item} end end sleep(1) end end end