class SimpleKafkaConsumer::Consumer

Attributes

consumer[R]
logger[R]

Public Class Methods

new(kafka_servers, zookeeper_servers, options = {}) click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 6
def initialize(kafka_servers, zookeeper_servers, options = {})
  @logger = options.delete(:logger)
  timeout_length = options.delete(:timeout) || 5
  @consumer = Poseidon::ConsumerGroup.new(
    group_name,
    kafka_servers,
    zookeeper_servers,
    topic_name,
    options
  )
  %w(INT TERM).each do |signal|
    Signal.trap(signal) do
      @terminated = true
      @timeout = timeout_length
    end
  end
end

Public Instance Methods

run() click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 24
def run
  debug "partitions: #{consumer.partitions}"
  debug "claimed: #{consumer.claimed}"
  consumer.fetch_loop do |partition, bulk|
    Timeout.timeout(@timeout) do
      bulk.each do |message|
        process(parse(message))
      end
    end
    break if @terminated
  end
rescue ZK::Exceptions::OperationTimeOut => e
  log e.message
  retry
end

Protected Instance Methods

consume(message) click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 56
def consume(message)
  puts "doing nothing"
end
debug(message) click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 51
def debug(message)
  return false unless logger
  logger.debug message
end
instrumenter(message) { || ... } click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 66
def instrumenter(message)
  yield
end
log(message) click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 46
def log(message)
  return false unless logger
  logger.info message
end
parse(message) click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 42
def parse(message)
  message
end
process(message) click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 60
def process(message)
  instrumenter(message) do
    consume(message)
  end
end