class Kafkat::Interface::Admin
Attributes
kafka_path[R]
zk_path[R]
Public Class Methods
new(config)
click to toggle source
# File lib/kafkat/interface/admin.rb, line 11 def initialize(config) @kafka_path = config.kafka_path @zk_path = config.zk_path end
Public Instance Methods
elect_leaders!(partitions)
click to toggle source
# File lib/kafkat/interface/admin.rb, line 16 def elect_leaders!(partitions) file = Tempfile.new('kafkat-partitions.json') json_partitions = [] partitions.each do |p| json_partitions << { 'topic' => p.topic_name, 'partition' => p.id } end json = {'partitions' => json_partitions} file.write(JSON.dump(json)) file.close run_tool( 'kafka-preferred-replica-election', '--path-to-json-file', file.path ) ensure file.unlink end
reassign!(assignments)
click to toggle source
# File lib/kafkat/interface/admin.rb, line 39 def reassign!(assignments) file = Tempfile.new('kafkat-partitions.json') json_partitions = [] assignments.each do |a| json_partitions << { 'topic' => a.topic_name, 'partition' => a.partition_id, 'replicas' => a.replicas } end json = { 'partitions' => json_partitions, 'version' => 1 } file.write(JSON.dump(json)) file.close run_tool( 'kafka-reassign-partitions', '--execute', '--reassignment-json-file', file.path ) ensure file.unlink end
run_tool(name, *args)
click to toggle source
# File lib/kafkat/interface/admin.rb, line 80 def run_tool(name, *args) path = File.join(kafka_path, "bin/#{name}.sh") args += ['--zookeeper', "\"#{zk_path}\""] args_string = args.join(' ') result = `#{path} #{args_string}` raise ExecutionFailedError if $?.to_i > 0 result end
shutdown!(broker_id, options={})
click to toggle source
# File lib/kafkat/interface/admin.rb, line 68 def shutdown!(broker_id, options={}) args = ['--broker', broker_id] args += ['--num.retries', options[:retries]] if options[:retries] args += ['--retry.interval.ms', option[:interval]] if options[:interval] run_tool( 'kafka-run-class', 'kafka.admin.ShutdownBroker', *args ) end