class PulsarSdk::Consumer::MessageTracker

Public Class Methods

new(redelivery_delay) click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 8
def initialize(redelivery_delay)
  @redelivery_delay = redelivery_delay
  @received_message = ReceivedQueue.new
  @acknowledge_message = NackQueue.new {|parent, child| child[:ack_at] <=> parent[:ack_at] }
  @consumers = {}

  @tracker = track
end

Public Instance Methods

add_consumer(consumer) click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 17
def add_consumer(consumer)
  @consumers[consumer.consumer_id] = consumer
end
close() click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 47
def close
  @received_message.close
end
receive(*args) click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 21
def receive(*args)
  @received_message.add(*args)
end
shift(timeout) click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 25
def shift(timeout)
  cmd, meta_and_payload = @received_message.pop(timeout)

  return if cmd.nil?

  message = PulsarSdk::Protocol::Structure.new(meta_and_payload).decode

  consumer_id = cmd.message&.consumer_id
  real_consumer = @consumers[consumer_id]

  message.assign_attributes(
    message_id: cmd.message&.message_id,
    consumer_id: consumer_id,
    topic: real_consumer&.topic,
    ack_handler: ack_handler
  )

  real_consumer&.increase_fetched

  [cmd, message]
end

Private Instance Methods

ack_handler() click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 71
def ack_handler
  Proc.new do |cmd|
    current_clock = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    ack_at = cmd.typeof_ack? ? (current_clock - 1) : (current_clock + @redelivery_delay.to_i)
    @acknowledge_message.insert(cmd: cmd, ack_at: ack_at)
  end
end
execute_async(cmd) click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 79
def execute_async(cmd)
  consumer = @consumers[cmd.get_consumer_id]

  consumer.execute_async(cmd)
end
track() click to toggle source
# File lib/pulsar_sdk/consumer/message_tracker.rb, line 52
def track
  Thread.new do
    loop do
      while item = @acknowledge_message.top
        break if item[:ack_at] > Process.clock_gettime(Process::CLOCK_MONOTONIC)
        begin
          PulsarSdk.logger.debug('acknowledge message'){"#{Process.clock_gettime(Process::CLOCK_MONOTONIC)}: #{item[:cmd].type} --> #{item[:ack_at]}"}
          execute_async(item[:cmd])
          @acknowledge_message.shift
        rescue => exp
          PulsarSdk.logger.error('Error occur when acknowledge message'){exp}
          PulsarSdk.logger.error('Error occur when acknowledge message'){item}
        end
      end
      sleep(1)
    end
  end
end