class DruidDB::Node::Overlord

Constants

INDEXER_PATH
RUNNING_TASKS_PATH
SUPERVISOR_PATH
TASK_PATH

Attributes

config[R]
zk[R]

Public Class Methods

new(config, zk) click to toggle source
# File lib/druiddb/node/overlord.rb, line 10
def initialize(config, zk)
  @config = config
  @zk = zk
end

Public Instance Methods

connection() click to toggle source

TODO: DRY: copy/paste

# File lib/druiddb/node/overlord.rb, line 16
def connection
  overlord = zk.registry["#{config.discovery_path}/druid:overlord"].first
  raise DruidDB::ConnectionError, 'no druid overlords available' if overlord.nil?
  zk.registry["#{config.discovery_path}/druid:overlord"].rotate! # round-robin load balancing
  DruidDB::Connection.new(host: overlord[:host], port: overlord[:port])
end
running_tasks(datasource_name = nil) click to toggle source
# File lib/druiddb/node/overlord.rb, line 23
def running_tasks(datasource_name = nil)
  response = connection.get(RUNNING_TASKS_PATH)
  raise ConnectionError, 'Could not retrieve running tasks' unless response.code.to_i == 200
  tasks = JSON.parse(response.body).map { |task| task['id'] }
  tasks.select! { |task| task.include? datasource_name } if datasource_name
  tasks ? tasks : []
end
shutdown_task(task) click to toggle source
# File lib/druiddb/node/overlord.rb, line 31
def shutdown_task(task)
  response = connection.post(TASK_PATH + task + '/shutdown')
  raise ConnectionError, 'Unable to shutdown task' unless response.code.to_i == 200
  bounded_wait_for_shutdown(task)
end
shutdown_tasks(datasource_name = nil) click to toggle source
# File lib/druiddb/node/overlord.rb, line 37
def shutdown_tasks(datasource_name = nil)
  tasks = running_tasks(datasource_name)
  tasks.each { |task| shutdown_task(task) }
end
submit_supervisor_spec(filepath) click to toggle source
# File lib/druiddb/node/overlord.rb, line 48
def submit_supervisor_spec(filepath)
  spec = JSON.parse(File.read(filepath))
  response = connection.post(SUPERVISOR_PATH, spec)
  raise ConnectionError, 'Unable to submit spec' unless response.code.to_i == 200
  JSON.parse(response.body)
end
supervisor_tasks() click to toggle source
# File lib/druiddb/node/overlord.rb, line 42
def supervisor_tasks
  response = connection.get(SUPERVISOR_PATH)
  raise ConnectionError, 'Could not retrieve supervisors' unless response.code.to_i == 200
  JSON.parse(response.body)
end

Private Instance Methods

bounded_wait_for_shutdown(task) click to toggle source
# File lib/druiddb/node/overlord.rb, line 57
def bounded_wait_for_shutdown(task)
  condition = !(running_tasks.include? task)
  attempts = 0
  max = 10

  until condition
    attempts += 1
    sleep 1
    condition = !(running_tasks.include? task)
    break if attempts >= max
  end

  raise ClientError, 'Task did not shutdown.' unless condition
  true
end