class Fluent::Plugin::ClouderametricsInput

Public Instance Methods

configure(conf) click to toggle source

Called before starting

Calls superclass method
# File lib/fluent/plugin/in_clouderametrics.rb, line 77
def configure(conf)
  super
  log.debug "Configure cloudera metrics"
  @manager_uri = "#{host}:#{port}/api/#{api_version}/#{api_endpoint}"

  if query != nil && !query.empty?
    @manager_uri += "?query=#{query}"
  else
    log.debug "No query param. The Cloudera API may require one."
  end
end
get_cloudera_metrics() click to toggle source
# File lib/fluent/plugin/in_clouderametrics.rb, line 62
def get_cloudera_metrics
  uri = URI(@manager_uri)
  log.debug uri

  req = Net::HTTP::Get.new(uri)
  req.basic_auth @user, @pass

  res = Net::HTTP.start(uri.hostname, uri.port) {|http|
    http.request(req)
  }

  res
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_clouderametrics.rb, line 95
def shutdown
  super
  log.debug "Shutdown cloudera metrics watcher thread"
  @watcher.terminate
  @watcher.join
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_clouderametrics.rb, line 89
def start
  super
  log.debug "Start cloudera metrics watcher thread"
  @watcher = Thread.new(&method(:watch))
end
watch() click to toggle source
# File lib/fluent/plugin/in_clouderametrics.rb, line 45
def watch
  log.debug "cloudera metrics: watch thread starting"

  until @finished
      res = get_cloudera_metrics
      
      log.debug "#{res.code} - #{res.message}"

      case res
        when Net::HTTPSuccess
          router.emit @tag, Fluent::Engine.now, JSON.load(res.body)
      end

      sleep @timespan
  end
end