class DruidDB::Node::Coordinator

Constants

DATASOURCES_PATH

Attributes

config[R]
zk[R]

Public Class Methods

new(config, zk) click to toggle source
# File lib/druiddb/node/coordinator.rb, line 7
def initialize(config, zk)
  @config = config
  @zk = zk
end

Public Instance Methods

connection() click to toggle source

TODO: DRY; copy/paste from broker

# File lib/druiddb/node/coordinator.rb, line 13
def connection
  coordinator = zk.registry["#{config.discovery_path}/druid:coordinator"].first
  raise DruidDB::ConnectionError, 'no druid coordinators available' if coordinator.nil?
  # round-robin load balancing
  zk.registry["#{config.discovery_path}/druid:coordinator"].rotate!
  DruidDB::Connection.new(host: coordinator[:host], port: coordinator[:port])
end
datasource_enabled?(datasource_name) click to toggle source

TODO: This should either be private or moved to datasource

# File lib/druiddb/node/coordinator.rb, line 41
def datasource_enabled?(datasource_name)
  list_datasources.include? datasource_name
end
datasource_has_segments?(datasource_name) click to toggle source

TODO: This should either be private or moved to datasource

# File lib/druiddb/node/coordinator.rb, line 46
def datasource_has_segments?(datasource_name)
  list_segments(datasource_name).any?
end
datasource_info(datasource_name) click to toggle source
# File lib/druiddb/node/coordinator.rb, line 21
def datasource_info(datasource_name)
  response = connection.get(DATASOURCES_PATH + datasource_name.to_s, full: true)
  unless response.code.to_i == 200
    raise ConnectionError, 'Unable to retrieve datasource information.'
  end
  JSON.parse(response.body)
end
disable_datasource(datasource_name) click to toggle source
# File lib/druiddb/node/coordinator.rb, line 29
def disable_datasource(datasource_name)
  # response = connection.delete(DATASOURCES_PATH + datasource_name.to_s)
  # raise ConnectionError, 'Unable to disable datasource' unless response.code.to_i == 200
  # return true if response.code.to_i == 200

  # This is a workaround for https://github.com/druid-io/druid/issues/3154
  disable_segments(datasource_name)
  bounded_wait_for_segments_disable(datasource_name)
  true
end
disable_segment(datasource_name, segment) click to toggle source
# File lib/druiddb/node/coordinator.rb, line 50
def disable_segment(datasource_name, segment)
  response = connection.delete(DATASOURCES_PATH + datasource_name + '/segments/' + segment)
  raise ConnectionError, "Unable to disable #{segment}" unless response.code.to_i == 200
  true
end
disable_segments(datasource_name) click to toggle source

TODO: This should either be private or moved to datasource

# File lib/druiddb/node/coordinator.rb, line 57
def disable_segments(datasource_name)
  segments = list_segments(datasource_name)
  segments.each { |segment| disable_segment(datasource_name, segment) }
end
issue_kill_task(datasource_name, interval) click to toggle source
# File lib/druiddb/node/coordinator.rb, line 62
def issue_kill_task(datasource_name, interval)
  response = connection.delete(DATASOURCES_PATH + datasource_name + '/intervals/' + interval)
  raise ConnectionError, 'Unable to issue kill task.' unless response.code.to_i == 200
  true
end
list_datasources(url_params = {}) click to toggle source
# File lib/druiddb/node/coordinator.rb, line 68
def list_datasources(url_params = {})
  response = connection.get(DATASOURCES_PATH, url_params)
  JSON.parse(response.body) if response.code.to_i == 200
end
list_segments(datasource_name) click to toggle source
# File lib/druiddb/node/coordinator.rb, line 73
def list_segments(datasource_name)
  response = connection.get(DATASOURCES_PATH + datasource_name + '/segments', full: true)
  case response.code.to_i
  when 200
    JSON.parse(response.body).map { |segment| segment['identifier'] }
  when 204
    []
  else
    raise ConnectionError, "Unable to list segments for #{datasource_name}"
  end
end

Private Instance Methods

bounded_wait_for_disable(datasource_name) click to toggle source
# File lib/druiddb/node/coordinator.rb, line 87
def bounded_wait_for_disable(datasource_name)
  condition = datasource_enabled?(datasource_name)
  attempts = 0
  max = 10

  while condition
    attempts += 1
    sleep 1
    condition = datasource_enabled?(datasource_name)
    break if attempts >= max
  end

  raise ClientError, 'Datasource should be disabled, but is still enabled.' unless condition
  true
end
bounded_wait_for_segments_disable(datasource_name) click to toggle source
# File lib/druiddb/node/coordinator.rb, line 103
def bounded_wait_for_segments_disable(datasource_name)
  condition = datasource_has_segments?(datasource_name)
  attempts = 0
  max = 60

  while condition
    attempts += 1
    sleep 1
    condition = datasource_has_segments?(datasource_name)
    break if attempts >= max
  end

  raise ClientError, 'Segments should be disabled, but are still enabled.' if condition
  true
end