class SimpleKafkaConsumer::Consumer
Attributes
consumer[R]
logger[R]
Public Class Methods
new(kafka_servers, zookeeper_servers, options = {})
click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 6 def initialize(kafka_servers, zookeeper_servers, options = {}) @logger = options.delete(:logger) timeout_length = options.delete(:timeout) || 5 @consumer = Poseidon::ConsumerGroup.new( group_name, kafka_servers, zookeeper_servers, topic_name, options ) %w(INT TERM).each do |signal| Signal.trap(signal) do @terminated = true @timeout = timeout_length end end end
Public Instance Methods
run()
click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 24 def run debug "partitions: #{consumer.partitions}" debug "claimed: #{consumer.claimed}" consumer.fetch_loop do |partition, bulk| Timeout.timeout(@timeout) do bulk.each do |message| process(parse(message)) end end break if @terminated end rescue ZK::Exceptions::OperationTimeOut => e log e.message retry end
Protected Instance Methods
consume(message)
click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 56 def consume(message) puts "doing nothing" end
debug(message)
click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 51 def debug(message) return false unless logger logger.debug message end
instrumenter(message) { || ... }
click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 66 def instrumenter(message) yield end
log(message)
click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 46 def log(message) return false unless logger logger.info message end
parse(message)
click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 42 def parse(message) message end
process(message)
click to toggle source
# File lib/simple_kafka_consumer/consumer.rb, line 60 def process(message) instrumenter(message) do consume(message) end end