class Kafkat::ClusterRestart::Session
Constants
- SESSION_PATH
- STATES
- STATE_NOT_RESTARTED
- STATE_PENDING
- STATE_RESTARTED
Attributes
broker_states[R]
Public Class Methods
exists?()
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 238 def self.exists? File.file?(File.expand_path(SESSION_PATH)) end
from_brokers(brokers)
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 265 def self.from_brokers(brokers) states = brokers.each_with_object({}) { |id, h| h[id] = STATE_NOT_RESTARTED } Session.new('broker_states' => states) end
from_zookeepers(zookeeper)
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 260 def self.from_zookeepers(zookeeper) broker_ids = zookeeper.get_broker_ids Session.from_brokers(broker_ids) end
load!(session_file = SESSION_PATH)
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 242 def self.load!(session_file = SESSION_PATH) path = File.expand_path(session_file) string = File.read(path) json = JSON.parse(string) self.new(json) rescue Errno::ENOENT raise NotFoundError rescue JSON::JSONError raise ParseError end
new(data = {})
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 270 def initialize(data = {}) @broker_states = data['broker_states'] || {} end
reset!(session_file = SESSION_PATH)
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 255 def self.reset!(session_file = SESSION_PATH) path = File.expand_path(session_file) File.delete(path) end
Public Instance Methods
all_restarted?()
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 319 def all_restarted? @broker_states.values.all? { |state| state == STATE_RESTARTED } end
not_restarted?(broker_id)
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 311 def not_restarted?(broker_id) state?(broker_id, STATE_NOT_RESTARTED) end
pending?(broker_id)
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 307 def pending?(broker_id) state?(broker_id, STATE_PENDING) end
pending_brokers()
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 323 def pending_brokers broker_states.keys.find_all do |broker_id| broker_states[broker_id] == STATE_PENDING end end
restarted?(broker_id)
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 315 def restarted?(broker_id) state?(broker_id, STATE_RESTARTED) end
save!(session_file = SESSION_PATH)
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 274 def save!(session_file = SESSION_PATH) File.open(File.expand_path(session_file), 'w') do |f| f.puts JSON.pretty_generate(self.to_h) end end
state(broker_id)
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 296 def state(broker_id) raise UnknownBrokerError, "Unknown broker: #{broker_id}" unless @broker_states.key?(broker_id) broker_states[broker_id] end
state?(broker_id, state)
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 301 def state?(broker_id, state) raise UnknownBrokerError, "Unknown broker: #{broker_id}" unless @broker_states.key?(broker_id) raise UnknownStateError, "Unknown state: #{state}" unless STATES.include?(state) @broker_states[broker_id] == state end
to_h()
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 329 def to_h { :broker_states => broker_states, } end
update_states!(state, ids)
click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 280 def update_states!(state, ids) state = state.to_s if state.is_a?(Symbol) unless STATES.include?(state) raise UnknownStateError, "Unknown State #{state}" end intersection = ids & broker_states.keys unless intersection == ids raise UnknownBrokerError, "Unknown brokers: #{(ids - intersection).join(', ')}" end ids.each { |id| broker_states[id] = state } self end