class Kafkat::Interface::Admin

Attributes

kafka_path[R]
zk_path[R]

Public Class Methods

new(config) click to toggle source
# File lib/kafkat/interface/admin.rb, line 11
def initialize(config)
  @kafka_path = config.kafka_path
  @zk_path = config.zk_path
end

Public Instance Methods

elect_leaders!(partitions) click to toggle source
# File lib/kafkat/interface/admin.rb, line 16
def elect_leaders!(partitions)
  file = Tempfile.new('kafkat-partitions.json')

  json_partitions = []
  partitions.each do |p|
    json_partitions << {
      'topic' => p.topic_name,
      'partition' => p.id
    }
  end

  json = {'partitions' => json_partitions}
  file.write(JSON.dump(json))
  file.close

  run_tool(
    'kafka-preferred-replica-election',
      '--path-to-json-file', file.path
  )
ensure
  file.unlink
end
reassign!(assignments) click to toggle source
# File lib/kafkat/interface/admin.rb, line 39
def reassign!(assignments)
  file = Tempfile.new('kafkat-partitions.json')

  json_partitions = []
  assignments.each do |a|
    json_partitions << {
      'topic' => a.topic_name,
      'partition' => a.partition_id,
      'replicas' => a.replicas
    }
  end

  json = {
    'partitions' => json_partitions,
    'version' => 1
  }

  file.write(JSON.dump(json))
  file.close

  run_tool(
    'kafka-reassign-partitions',
      '--execute',
      '--reassignment-json-file', file.path
  )
ensure
  file.unlink
end
run_tool(name, *args) click to toggle source
# File lib/kafkat/interface/admin.rb, line 80
def run_tool(name, *args)
  path = File.join(kafka_path, "bin/#{name}.sh")
  args += ['--zookeeper', "\"#{zk_path}\""]
  args_string = args.join(' ')
  result = `#{path} #{args_string}`
  raise ExecutionFailedError if $?.to_i > 0
  result
end
shutdown!(broker_id, options={}) click to toggle source
# File lib/kafkat/interface/admin.rb, line 68
def shutdown!(broker_id, options={})
  args = ['--broker', broker_id]
  args += ['--num.retries', options[:retries]] if options[:retries]
  args += ['--retry.interval.ms', option[:interval]] if options[:interval]

  run_tool(
    'kafka-run-class',
      'kafka.admin.ShutdownBroker',
      *args
  )
end