class Kafkat::ClusterRestart::Session

Constants

SESSION_PATH
STATES
STATE_NOT_RESTARTED
STATE_PENDING
STATE_RESTARTED

Attributes

broker_states[R]

Public Class Methods

exists?() click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 238
def self.exists?
  File.file?(File.expand_path(SESSION_PATH))
end
from_brokers(brokers) click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 265
def self.from_brokers(brokers)
  states = brokers.each_with_object({}) { |id, h| h[id] = STATE_NOT_RESTARTED }
  Session.new('broker_states' => states)
end
from_zookeepers(zookeeper) click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 260
def self.from_zookeepers(zookeeper)
  broker_ids = zookeeper.get_broker_ids
  Session.from_brokers(broker_ids)
end
load!(session_file = SESSION_PATH) click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 242
def self.load!(session_file = SESSION_PATH)
  path = File.expand_path(session_file)
  string = File.read(path)

  json = JSON.parse(string)
  self.new(json)

rescue Errno::ENOENT
  raise NotFoundError
rescue JSON::JSONError
  raise ParseError
end
new(data = {}) click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 270
def initialize(data = {})
  @broker_states = data['broker_states'] || {}
end
reset!(session_file = SESSION_PATH) click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 255
def self.reset!(session_file = SESSION_PATH)
  path = File.expand_path(session_file)
  File.delete(path)
end

Public Instance Methods

all_restarted?() click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 319
def all_restarted?
  @broker_states.values.all? { |state| state == STATE_RESTARTED }
end
not_restarted?(broker_id) click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 311
def not_restarted?(broker_id)
  state?(broker_id, STATE_NOT_RESTARTED)
end
pending?(broker_id) click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 307
def pending?(broker_id)
  state?(broker_id, STATE_PENDING)
end
pending_brokers() click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 323
def pending_brokers
  broker_states.keys.find_all do |broker_id|
    broker_states[broker_id] == STATE_PENDING
  end
end
restarted?(broker_id) click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 315
def restarted?(broker_id)
  state?(broker_id, STATE_RESTARTED)
end
save!(session_file = SESSION_PATH) click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 274
def save!(session_file = SESSION_PATH)
  File.open(File.expand_path(session_file), 'w') do |f|
    f.puts JSON.pretty_generate(self.to_h)
  end
end
state(broker_id) click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 296
def state(broker_id)
  raise UnknownBrokerError, "Unknown broker: #{broker_id}" unless @broker_states.key?(broker_id)
  broker_states[broker_id]
end
state?(broker_id, state) click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 301
def state?(broker_id, state)
  raise UnknownBrokerError, "Unknown broker: #{broker_id}" unless @broker_states.key?(broker_id)
  raise UnknownStateError, "Unknown state: #{state}" unless STATES.include?(state)
  @broker_states[broker_id] == state
end
to_h() click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 329
def to_h
  {
      :broker_states => broker_states,
  }
end
update_states!(state, ids) click to toggle source
# File lib/kafkat/command/cluster_restart.rb, line 280
def update_states!(state, ids)
  state = state.to_s if state.is_a?(Symbol)
  unless STATES.include?(state)
    raise UnknownStateError, "Unknown State #{state}"
  end

  intersection = ids & broker_states.keys
  unless intersection == ids
    raise UnknownBrokerError, "Unknown brokers: #{(ids - intersection).join(', ')}"
  end

  ids.each { |id| broker_states[id] = state }
  self
end