class Fluent::KafkaInput::OffsetManager

Public Class Methods

new(topic_entry, zookeeper, zk_root_node) click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 360
def initialize(topic_entry, zookeeper, zk_root_node)
  @zookeeper = zookeeper
  @zk_path = "#{zk_root_node}/#{topic_entry.topic}/#{topic_entry.partition}/next_offset"
  create_node(@zk_path, topic_entry.topic, topic_entry.partition)
end

Public Instance Methods

create_node(zk_path, topic, partition) click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 366
def create_node(zk_path, topic, partition)
  path = ""
  zk_path.split(/(\/[^\/]+)/).reject(&:empty?).each { |dir|
    path = path + dir
    @zookeeper.create(:path => "#{path}")
  }
  $log.trace "use zk offset node : #{path}"
end
next_offset() click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 375
def next_offset
  @zookeeper.get(:path => @zk_path)[:data].to_i
end
save_offset(offset) click to toggle source
# File lib/fluent/plugin/in_kafka.rb, line 379
def save_offset(offset)
  @zookeeper.set(:path => @zk_path, :data => offset.to_s)
  $log.trace "update zk offset node : #{offset.to_s}"
end