class Druid::DataSource

Attributes

dimensions[R]
metrics[R]
name[R]
uri[R]

Public Class Methods

new(name, uri) click to toggle source
# File lib/druid/data_source.rb, line 9
def initialize(name, uri)
  @name = name.split('/').last
  uri = uri.sample if uri.is_a?(Array)
  if uri.is_a?(String)
    @uri = URI(uri)
  else
    @uri = uri
  end
end

Public Instance Methods

metadata() click to toggle source
# File lib/druid/data_source.rb, line 19
def metadata
  @metadata ||= metadata!
end
metadata!(opts = {}) click to toggle source
# File lib/druid/data_source.rb, line 23
def metadata!(opts = {})
  meta_path = "#{@uri.path}datasources/#{name}"

  if opts[:interval]
    from, to = opts[:interval]
    from = from.respond_to?(:iso8601) ? from.iso8601 : ISO8601::DateTime.new(from).to_s
    to = to.respond_to?(:iso8601) ? to.iso8601 : ISO8601::DateTime.new(to).to_s

    meta_path += "?interval=#{from}/#{to}"
  end

  req = Net::HTTP::Get.new(meta_path)
  response = Net::HTTP.new(uri.host, uri.port).start do |http|
    http.open_timeout = 10 # if druid is down fail fast
    http.read_timeout = nil # we wait until druid is finished
    http.request(req)
  end

  if response.code != '200'
    raise "Request failed: #{response.code}: #{response.body}"
  end

  MultiJson.load(response.body)
end
post(query) click to toggle source
# File lib/druid/data_source.rb, line 56
def post(query)
  query = query.query if query.is_a?(Druid::Query::Builder)
  query = Query.new(MultiJson.load(query)) if query.is_a?(String)
  query.dataSource = name

  req = Net::HTTP::Post.new(uri.path, { 'Content-Type' => 'application/json' })
  query_as_json = query.as_json
  req.body = MultiJson.dump(query_as_json)


  response = ActiveSupport::Notifications.instrument('post.druid', data_source: name, query: query_as_json) do
    Net::HTTP.new(uri.host, uri.port).start do |http|
      http.open_timeout = 10 # if druid is down fail fast
      http.read_timeout = nil # we wait until druid is finished
      http.request(req)
    end
  end

  if response.code != '200'
    # ignore GroupBy cache issues and try again without cached results
    if query.context.useCache != false && response.code == "500" && response.body =~ /Cannot have a null result!/
      query.context.useCache = false
      return self.post(query)
    end

    raise Error.new(response)
  end

  MultiJson.load(response.body)
end