class DruidDB::Node::Coordinator
Constants
- DATASOURCES_PATH
Attributes
config[R]
zk[R]
Public Class Methods
new(config, zk)
click to toggle source
# File lib/druiddb/node/coordinator.rb, line 7 def initialize(config, zk) @config = config @zk = zk end
Public Instance Methods
connection()
click to toggle source
TODO: DRY; copy/paste from broker
# File lib/druiddb/node/coordinator.rb, line 13 def connection coordinator = zk.registry["#{config.discovery_path}/druid:coordinator"].first raise DruidDB::ConnectionError, 'no druid coordinators available' if coordinator.nil? # round-robin load balancing zk.registry["#{config.discovery_path}/druid:coordinator"].rotate! DruidDB::Connection.new(host: coordinator[:host], port: coordinator[:port]) end
datasource_enabled?(datasource_name)
click to toggle source
TODO: This should either be private or moved to datasource
# File lib/druiddb/node/coordinator.rb, line 41 def datasource_enabled?(datasource_name) list_datasources.include? datasource_name end
datasource_has_segments?(datasource_name)
click to toggle source
TODO: This should either be private or moved to datasource
# File lib/druiddb/node/coordinator.rb, line 46 def datasource_has_segments?(datasource_name) list_segments(datasource_name).any? end
datasource_info(datasource_name)
click to toggle source
# File lib/druiddb/node/coordinator.rb, line 21 def datasource_info(datasource_name) response = connection.get(DATASOURCES_PATH + datasource_name.to_s, full: true) unless response.code.to_i == 200 raise ConnectionError, 'Unable to retrieve datasource information.' end JSON.parse(response.body) end
disable_datasource(datasource_name)
click to toggle source
# File lib/druiddb/node/coordinator.rb, line 29 def disable_datasource(datasource_name) # response = connection.delete(DATASOURCES_PATH + datasource_name.to_s) # raise ConnectionError, 'Unable to disable datasource' unless response.code.to_i == 200 # return true if response.code.to_i == 200 # This is a workaround for https://github.com/druid-io/druid/issues/3154 disable_segments(datasource_name) bounded_wait_for_segments_disable(datasource_name) true end
disable_segment(datasource_name, segment)
click to toggle source
# File lib/druiddb/node/coordinator.rb, line 50 def disable_segment(datasource_name, segment) response = connection.delete(DATASOURCES_PATH + datasource_name + '/segments/' + segment) raise ConnectionError, "Unable to disable #{segment}" unless response.code.to_i == 200 true end
disable_segments(datasource_name)
click to toggle source
TODO: This should either be private or moved to datasource
# File lib/druiddb/node/coordinator.rb, line 57 def disable_segments(datasource_name) segments = list_segments(datasource_name) segments.each { |segment| disable_segment(datasource_name, segment) } end
issue_kill_task(datasource_name, interval)
click to toggle source
# File lib/druiddb/node/coordinator.rb, line 62 def issue_kill_task(datasource_name, interval) response = connection.delete(DATASOURCES_PATH + datasource_name + '/intervals/' + interval) raise ConnectionError, 'Unable to issue kill task.' unless response.code.to_i == 200 true end
list_datasources(url_params = {})
click to toggle source
# File lib/druiddb/node/coordinator.rb, line 68 def list_datasources(url_params = {}) response = connection.get(DATASOURCES_PATH, url_params) JSON.parse(response.body) if response.code.to_i == 200 end
list_segments(datasource_name)
click to toggle source
# File lib/druiddb/node/coordinator.rb, line 73 def list_segments(datasource_name) response = connection.get(DATASOURCES_PATH + datasource_name + '/segments', full: true) case response.code.to_i when 200 JSON.parse(response.body).map { |segment| segment['identifier'] } when 204 [] else raise ConnectionError, "Unable to list segments for #{datasource_name}" end end
Private Instance Methods
bounded_wait_for_disable(datasource_name)
click to toggle source
# File lib/druiddb/node/coordinator.rb, line 87 def bounded_wait_for_disable(datasource_name) condition = datasource_enabled?(datasource_name) attempts = 0 max = 10 while condition attempts += 1 sleep 1 condition = datasource_enabled?(datasource_name) break if attempts >= max end raise ClientError, 'Datasource should be disabled, but is still enabled.' unless condition true end
bounded_wait_for_segments_disable(datasource_name)
click to toggle source
# File lib/druiddb/node/coordinator.rb, line 103 def bounded_wait_for_segments_disable(datasource_name) condition = datasource_has_segments?(datasource_name) attempts = 0 max = 60 while condition attempts += 1 sleep 1 condition = datasource_has_segments?(datasource_name) break if attempts >= max end raise ClientError, 'Segments should be disabled, but are still enabled.' if condition true end