class Fluent::KafkaInput::OffsetManager
Public Class Methods
new(topic_entry, zookeeper, zk_root_node)
click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 317 def initialize(topic_entry, zookeeper, zk_root_node) @zookeeper = zookeeper @zk_path = "#{zk_root_node}/#{topic_entry.topic}/#{topic_entry.partition}/next_offset" create_node(@zk_path, topic_entry.topic, topic_entry.partition) end
Public Instance Methods
create_node(zk_path, topic, partition)
click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 323 def create_node(zk_path, topic, partition) path = "" zk_path.split(/(\/[^\/]+)/).reject(&:empty?).each { |dir| path = path + dir @zookeeper.create(:path => "#{path}") } $log.trace "use zk offset node : #{path}" end
next_offset()
click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 332 def next_offset @zookeeper.get(:path => @zk_path)[:data].to_i end
save_offset(offset)
click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 336 def save_offset(offset) @zookeeper.set(:path => @zk_path, :data => offset.to_s) $log.trace "update zk offset node : #{offset.to_s}" end