class DruidDB::Node::Overlord
Constants
- INDEXER_PATH
- RUNNING_TASKS_PATH
- SUPERVISOR_PATH
- TASK_PATH
Attributes
config[R]
zk[R]
Public Class Methods
new(config, zk)
click to toggle source
# File lib/druiddb/node/overlord.rb, line 10 def initialize(config, zk) @config = config @zk = zk end
Public Instance Methods
connection()
click to toggle source
TODO: DRY: copy/paste
# File lib/druiddb/node/overlord.rb, line 16 def connection overlord = zk.registry["#{config.discovery_path}/druid:overlord"].first raise DruidDB::ConnectionError, 'no druid overlords available' if overlord.nil? zk.registry["#{config.discovery_path}/druid:overlord"].rotate! # round-robin load balancing DruidDB::Connection.new(host: overlord[:host], port: overlord[:port]) end
running_tasks(datasource_name = nil)
click to toggle source
# File lib/druiddb/node/overlord.rb, line 23 def running_tasks(datasource_name = nil) response = connection.get(RUNNING_TASKS_PATH) raise ConnectionError, 'Could not retrieve running tasks' unless response.code.to_i == 200 tasks = JSON.parse(response.body).map { |task| task['id'] } tasks.select! { |task| task.include? datasource_name } if datasource_name tasks ? tasks : [] end
shutdown_task(task)
click to toggle source
# File lib/druiddb/node/overlord.rb, line 31 def shutdown_task(task) response = connection.post(TASK_PATH + task + '/shutdown') raise ConnectionError, 'Unable to shutdown task' unless response.code.to_i == 200 bounded_wait_for_shutdown(task) end
shutdown_tasks(datasource_name = nil)
click to toggle source
# File lib/druiddb/node/overlord.rb, line 37 def shutdown_tasks(datasource_name = nil) tasks = running_tasks(datasource_name) tasks.each { |task| shutdown_task(task) } end
submit_supervisor_spec(filepath)
click to toggle source
# File lib/druiddb/node/overlord.rb, line 48 def submit_supervisor_spec(filepath) spec = JSON.parse(File.read(filepath)) response = connection.post(SUPERVISOR_PATH, spec) raise ConnectionError, 'Unable to submit spec' unless response.code.to_i == 200 JSON.parse(response.body) end
supervisor_tasks()
click to toggle source
# File lib/druiddb/node/overlord.rb, line 42 def supervisor_tasks response = connection.get(SUPERVISOR_PATH) raise ConnectionError, 'Could not retrieve supervisors' unless response.code.to_i == 200 JSON.parse(response.body) end
Private Instance Methods
bounded_wait_for_shutdown(task)
click to toggle source
# File lib/druiddb/node/overlord.rb, line 57 def bounded_wait_for_shutdown(task) condition = !(running_tasks.include? task) attempts = 0 max = 10 until condition attempts += 1 sleep 1 condition = !(running_tasks.include? task) break if attempts >= max end raise ClientError, 'Task did not shutdown.' unless condition true end