class YAKC::Reader

Attributes

config[R]
message_handler[R]
terminated[R]

Public Class Methods

new( message_handler: ) click to toggle source
# File lib/yakc/reader.rb, line 6
def initialize( message_handler: )
  @message_handler  = message_handler
  @config           = YAKC.configuration     

  raise KeyError, "YAKC::Reader initialized without a message handler. Please specify one so that your receives messages don't end up on the floor. For more info, go to: https://github.com/gaorlov/yakc#message-handler" unless message_handler
  
  %w(INT TERM).each do |signal|
    Signal.trap(signal) do
      @terminated = true
    end
  end
end

Public Instance Methods

read() click to toggle source
# File lib/yakc/reader.rb, line 19
def read
  logger.info "YAKC: Starting reading"
  loop do
    consumers.map do |consumer|
      consumer.fetch do |partition, bulk|
        bulk.each do |message|
          message_handler.handle consumer.topic, message
        end
      end
      return if terminated
    end
    return if terminated
  end
rescue => e
  logger.error e
  retry
end

Private Instance Methods

consumers() click to toggle source
# File lib/yakc/reader.rb, line 39
def consumers
  @consumers ||= topics.map do |topic|
    Poseidon::ConsumerGroup.new(
    "#{app}-#{topic}-consumer-#{suffix}", 
    brokers,
    zookeepers, 
    topic,
    {})
  end
end