class Kafkat::Interface::Admin
Attributes
json_files_path[R]
kafka_path[R]
zk_path[R]
Public Class Methods
new(config)
click to toggle source
# File lib/kafkat/interface/admin.rb, line 12 def initialize(config) @kafka_path = config.kafka_path @zk_path = config.zk_path @json_files_path = config.json_files_path end
Public Instance Methods
elect_leaders!(partitions)
click to toggle source
# File lib/kafkat/interface/admin.rb, line 18 def elect_leaders!(partitions) file = File.new File.join(@json_files_path, "elect-leaders_#{Time.now.xmlschema}.json"), "w" json_partitions = [] partitions.each do |p| json_partitions << { 'topic' => p.topic_name, 'partition' => p.id } end json = {'partitions' => json_partitions} file.write(JSON.dump(json)) file.close puts "Using JSON file: " + file.path run_tool( 'kafka-preferred-replica-election', '--path-to-json-file', file.path ) end
reassign!(assignments)
click to toggle source
# File lib/kafkat/interface/admin.rb, line 41 def reassign!(assignments) file_name = "reassign_#{Time.now.xmlschema}.json" file = File.new File.join(@json_files_path, file_name), "w" json_partitions = [] assignments.each do |a| json_partitions << { 'topic' => a.topic_name, 'partition' => a.partition_id, 'replicas' => a.replicas } end json = { 'partitions' => json_partitions, 'version' => 1 } file.write(JSON.dump(json)) file.close puts "Using JSON file: " + file.path puts "Run this command to check the status: kafkat verify-reassign #{file_name}" run_tool( 'kafka-reassign-partitions', '--execute', '--reassignment-json-file', file.path ) end
run_tool(name, *args)
click to toggle source
# File lib/kafkat/interface/admin.rb, line 101 def run_tool(name, *args) path = File.join(kafka_path, "bin/#{name}.sh") # The scripts in the Confluent package does not have .sh extensions if !File.exist? path path = File.join(kafka_path, "bin/#{name}") end args += ['--zookeeper', "\"#{zk_path}\""] args_string = args.join(' ') result = `#{path} #{args_string}` raise ExecutionFailedError if $?.to_i > 0 result end
shutdown!(broker_id, options={})
click to toggle source
# File lib/kafkat/interface/admin.rb, line 89 def shutdown!(broker_id, options={}) args = ['--broker', broker_id] args += ['--num.retries', options[:retries]] if options[:retries] args += ['--retry.interval.ms', option[:interval]] if options[:interval] run_tool( 'kafka-run-class', 'kafka.admin.ShutdownBroker', *args ) end
verify_reassign(file_name)
click to toggle source
# File lib/kafkat/interface/admin.rb, line 72 def verify_reassign(file_name) file = if File.exist? file_name File.new file_name else File.new File.join(@json_files_path, file_name) end puts "Using JSON file: " + file.path run_tool( 'kafka-reassign-partitions', '--verify', '--reassignment-json-file', file.path ) end