class LogStash::Inputs::Jmx

This input plugin permits to retrieve metrics from remote Java applications using JMX. Every `polling_frequency`, it scans a folder containing json configuration files describing JVMs to monitor with metrics to retrieve. Then a pool of threads will retrieve metrics and create events.

## The configuration:

In Logstash configuration, you must set the polling frequency, the number of thread used to poll metrics and a directory absolute path containing json files with the configuration per jvm of metrics to retrieve. Logstash input configuration example:

source,ruby

jmx {

//Required
path => "/apps/logstash_conf/jmxconf"
//Optional, default 60s
polling_frequency => 15
type => "jmx"
//Optional, default 4
nb_thread => 4

}

Json JMX configuration example:

source,js

{

//Required, JMX listening host/ip
"host" : "192.168.1.2",
//Required, JMX listening port
"port" : 1335,
//Optional, the username to connect to JMX
"username" : "user",
//Optional, the password to connect to JMX
"password": "pass",
//Optional, use this alias as a prefix in the metric name. If not set use <host>_<port>
"alias" : "test.homeserver.elasticsearch",
//Required, list of JMX metrics to retrieve
"queries" : [
{
  //Required, the object name of Mbean to request
  "object_name" : "java.lang:type=Memory",
  //Optional, use this alias in the metrics value instead of the object_name
  "object_alias" : "Memory"
}, {
  "object_name" : "java.lang:type=Runtime",
  //Optional, set of attributes to retrieve. If not set retrieve
  //all metrics available on the configured object_name.
  "attributes" : [ "Uptime", "StartTime" ],
  "object_alias" : "Runtime"
}, {
  //object_name can be configured with * to retrieve all matching Mbeans
  "object_name" : "java.lang:type=GarbageCollector,name=*",
  "attributes" : [ "CollectionCount", "CollectionTime" ],
  //object_alias can be based on specific value from the object_name thanks to ${<varname>}.
  //In this case ${type} will be replaced by GarbageCollector...
  "object_alias" : "${type}.${name}"
}, {
  "object_name" : "java.nio:type=BufferPool,name=*",
  "object_alias" : "${type}.${name}"
} ]

}

Here are examples of generated events. When returned metrics value type is number/boolean it is stored in `metric_value_number` event field otherwise it is stored in `metric_value_string` event field.

source,ruby

{

"@version" => "1",
"@timestamp" => "2014-02-18T20:57:27.688Z",
"host" => "192.168.1.2",
"path" => "/apps/logstash_conf/jmxconf",
"type" => "jmx",
"metric_path" => "test.homeserver.elasticsearch.GarbageCollector.ParNew.CollectionCount",
"metric_value_number" => 2212

}

source,ruby

{

"@version" => "1",
"@timestamp" => "2014-02-18T20:58:06.376Z",
"host" => "localhost",
"path" => "/apps/logstash_conf/jmxconf",
"type" => "jmx",
"metric_path" => "test.homeserver.elasticsearch.BufferPool.mapped.ObjectName",
"metric_value_string" => "java.nio:type=BufferPool,name=mapped"

}

Constants

BAD_TYPE_CONFIG_PARAMETER
BAD_TYPE_QUERY
BAD_TYPE_QUERY_PARAMETER
MISSING_CONFIG_PARAMETER

Error messages

MISSING_QUERY_PARAMETER

Attributes

queue_conf[RW]
regexp_group_alias_object[RW]

Class Var

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/jmx.rb, line 313
def register
  require 'thread'
  require 'jmx4r'

  @logger.info("Create queue dispatching JMX requests to threads")
  @queue_conf = Queue.new

  @logger.info("Compile regexp for group alias object replacement")
  @regexp_group_alias_object = Regexp.new('(?:\${(.*?)})+')
end
run(queue) click to toggle source
# File lib/logstash/inputs/jmx.rb, line 325
def run(queue)
  begin
    threads = []
    @logger.info("Initialize #{@nb_thread} threads for JMX metrics collection")
    @nb_thread.times do
      threads << Thread.new { thread_jmx(@queue_conf,queue) }
    end

    while !stop?
      @logger.info("Loading configuration files in path", :path => @path)
      Dir.foreach(@path) do |item|
        next if item == '.' or item == '..'
        begin
          file_conf = File.join(@path, item)
          @logger.debug? && @logger.debug("Loading configuration from file", :file => file_conf)
          config_string = File.read(file_conf)
          conf_hash = LogStash::Json.load(config_string)
          validation_errors = validate_configuration(conf_hash)
          if validation_errors.empty?
            @logger.debug? && @logger.debug("Add configuration to the queue", :config => conf_hash)
            @queue_conf << conf_hash
          else
            @logger.warn("Issue with configuration file", :file => file_conf,
            :validation_errors => validation_errors)
          end
        rescue Exception => ex
          @logger.warn("Issue loading configuration from file", :file => file_conf,
            :exception => ex.message, :backtrace => ex.backtrace)
        end
      end

      @logger.debug('Wait until the queue conf is empty')
      delta=0
      until @queue_conf.empty?
        @logger.debug("There are still #{@queue_conf.size} messages in the queue conf. Sleep 1s.")
        delta=delta+1
        sleep(1)
      end

      wait_time=@polling_frequency-delta
      if wait_time>0
        @logger.debug("Wait #{wait_time}s (#{@polling_frequency}-#{delta}(seconds wait until queue conf empty)) before to launch again a new jmx metrics collection")
        Stud.stoppable_sleep(wait_time) { stop? }
      else
        @logger.warn("The time taken to retrieve metrics is more important than the retrieve_interval time set.
                     \nYou must adapt nb_thread, retrieve_interval to the number of jvm/metrics you want to retrieve.")
      end
    end
  rescue Exception => ex
    @logger.error(ex.message)
    @logger.error(ex.backtrace.join("\n"))
  ensure
    @nb_thread.times do |i|
      @logger.debug? && @logger.debug("Signaling termination to jmx thread #{i + 1}")
      @queue_conf << :END
    end
    threads.each {|t| t.join }
  end
end
validate_configuration(conf_hash) click to toggle source

Verify that all required parameter are present in the conf_hash

# File lib/logstash/inputs/jmx.rb, line 117
def validate_configuration(conf_hash)
  validation_errors = []
  #Check required parameters in configuration
  ["host", "port","queries"].each do |param|
    validation_errors << MISSING_CONFIG_PARAMETER % param unless conf_hash.has_key?(param)
  end

  #Validate parameters type in configuration
  {"host" => String, "port" => Fixnum, "alias" => String }.each do |param, expected_type|
    if conf_hash.has_key?(param) && !conf_hash[param].instance_of?(expected_type)
      validation_errors << BAD_TYPE_CONFIG_PARAMETER % { :param => param, :expected => expected_type, :actual => conf_hash[param].class }
    end
  end

  if conf_hash.has_key?("queries")
    if !conf_hash["queries"].respond_to?(:each)
      validation_errors << BAD_TYPE_CONFIG_PARAMETER % { :param => 'queries', :expected => Enumerable, :actual => conf_hash['queries'].class }
    else
      conf_hash['queries'].each_with_index do |query,index|
        unless query.respond_to?(:[]) && query.respond_to?(:has_key?)
          validation_errors << BAD_TYPE_QUERY % {:index => index, :expected => Hash, :actual => query.class}
          next
        end
        #Check required parameters in each query
        ["object_name"].each do |param|
          validation_errors << MISSING_QUERY_PARAMETER % [param,index] unless query.has_key?(param)
        end
        #Validate parameters type in each query
        {"object_name" => String, "object_alias" => String }.each do |param, expected_type|
          if query.has_key?(param) && !query[param].instance_of?(expected_type)
            validation_errors << BAD_TYPE_QUERY_PARAMETER % { :param => param, :index => index, :expected => expected_type, :actual => query[param].class }
          end
        end

        if query.has_key?("attributes") && !query["attributes"].respond_to?(:each)
          validation_errors << BAD_TYPE_QUERY_PARAMETER % { :param => 'attributes', :index => index, :expected => Enumerable, :actual => query['attributes'].class }
        end
      end
    end
  end
  return validation_errors
end

Private Instance Methods

replace_alias_object(r_alias_object,object_name) click to toggle source
# File lib/logstash/inputs/jmx.rb, line 161
def replace_alias_object(r_alias_object,object_name)
  @logger.debug("Replace ${.*} variables from #{r_alias_object} using #{object_name}")
  group_alias = @regexp_group_alias_object.match(r_alias_object)
  if group_alias
    r_alias_object = r_alias_object.gsub('${'+group_alias[1]+'}',object_name.split(group_alias[1]+'=')[1].split(',')[0])
    r_alias_object = replace_alias_object(r_alias_object,object_name)
  end
  r_alias_object
end
send_event_to_queue(queue,host,metric_path,metric_value) click to toggle source
# File lib/logstash/inputs/jmx.rb, line 172
def send_event_to_queue(queue,host,metric_path,metric_value)
  @logger.debug('Send event to queue to be processed by filters/outputs')
  event = LogStash::Event.new
  event.set('host', host)
  event.set('path', @path)
  event.set('type', @type)
  number_type = [Fixnum, Bignum, Float]
  boolean_type = [TrueClass, FalseClass]
  metric_path_substituted = metric_path.gsub(' ','_').gsub('"','')
  if number_type.include?(metric_value.class)
    @logger.debug("The value #{metric_value} is of type number: #{metric_value.class}")
    event.set('metric_path', metric_path_substituted)
    event.set('metric_value_number', metric_value)
  elsif boolean_type.include?(metric_value.class)
    @logger.debug("The value #{metric_value} is of type boolean: #{metric_value.class}")
    event.set('metric_path', metric_path_substituted+'_bool')
    event.set('metric_value_number', metric_value ? 1 : 0)
  else
    @logger.debug("The value #{metric_value} is not of type number: #{metric_value.class}")
    event.set('metric_path', metric_path_substituted)
    event.set('metric_value_string', metric_value.to_s)
  end
  decorate(event)
  queue << event
end
thread_jmx(queue_conf,queue) click to toggle source

Thread function to retrieve metrics from JMX

# File lib/logstash/inputs/jmx.rb, line 200
def thread_jmx(queue_conf,queue)
  while true
    begin
      @logger.debug('Wait config to retrieve from queue conf')
      thread_hash_conf = queue_conf.pop

      if thread_hash_conf == :END
        # the :END symbol is a signal to terminate the thread
        @logger.debug? && @logger.debug("Received jmx thread termination signal")
        return
      else
        @logger.debug? && @logger.debug("Retrieve config #{thread_hash_conf} from queue conf")
      end

      @logger.debug('Check if jmx connection need a user/password')
      if thread_hash_conf.has_key?('username') and thread_hash_conf.has_key?('password')
        @logger.debug("Connect to #{thread_hash_conf['host']}:#{thread_hash_conf['port']} with user #{thread_hash_conf['username']}")
        jmx_connection = JMX::MBean.connection :host => thread_hash_conf['host'],
                                               :port => thread_hash_conf['port'],
                                               :url => thread_hash_conf['url'],
                                               :username => thread_hash_conf['username'],
                                               :password => thread_hash_conf['password']
      else
        @logger.debug("Connect to #{thread_hash_conf['host']}:#{thread_hash_conf['port']}:#{thread_hash_conf['url']}")
        jmx_connection = JMX::MBean.connection :host => thread_hash_conf['host'],
                                               :port => thread_hash_conf['port'],
                                               :url => thread_hash_conf['url']
      end

      if jmx_connection.nil?
        @logger.warn("Invalid nil jmx connection, ignoring", :host => thread_hash_conf['host'], :port => thread_hash_conf['port'],  :url => thread_hash_conf['url'])
        next
      end

      if thread_hash_conf.has_key?('alias')
        @logger.debug("Set base_metric_path to alias: #{thread_hash_conf['alias']}")
        base_metric_path = thread_hash_conf['alias']
      else
        @logger.debug("Set base_metric_path to host_port: #{thread_hash_conf['host']}_#{thread_hash_conf['port']}")
        base_metric_path = "#{thread_hash_conf['host']}_#{thread_hash_conf['port']}"
      end


      @logger.debug("Treat queries #{thread_hash_conf['queries']}")
      thread_hash_conf['queries'].each do |query|
        @logger.debug("Find all objects name #{query['object_name']}")
        jmx_object_name_s = JMX::MBean.find_all_by_name(query['object_name'], :connection => jmx_connection)

        if jmx_object_name_s.length > 0
          jmx_object_name_s.each do |jmx_object_name|
            if query.has_key?('object_alias')
              object_name = replace_alias_object(query['object_alias'],jmx_object_name.object_name.to_s)
              @logger.debug("Set object_name to object_alias: #{object_name}")
            else
              object_name = jmx_object_name.object_name.to_s
              @logger.debug("Set object_name to jmx object_name: #{object_name}")
            end

            if query.has_key?('attributes')
              @logger.debug("Retrieves attributes #{query['attributes']} to #{jmx_object_name.object_name}")
              query['attributes'].each do |attribute|
                begin
                  jmx_attribute_value = jmx_object_name.send(attribute.snake_case)
                  if jmx_attribute_value.instance_of? Java::JavaxManagementOpenmbean::CompositeDataSupport
                    @logger.debug('The jmx value is a composite_data one')
                    jmx_attribute_value.each do |jmx_attribute_value_composite|
                      @logger.debug("Get jmx value #{jmx_attribute_value[jmx_attribute_value_composite]} for attribute #{attribute}.#{jmx_attribute_value_composite} to #{jmx_object_name.object_name}")
                      send_event_to_queue(queue, thread_hash_conf['host'], "#{base_metric_path}.#{object_name}.#{attribute}.#{jmx_attribute_value_composite}", jmx_attribute_value[jmx_attribute_value_composite])
                    end
                  else
                    @logger.debug("Get jmx value #{jmx_attribute_value} for attribute #{attribute} to #{jmx_object_name.object_name}")
                    send_event_to_queue(queue, thread_hash_conf['host'], "#{base_metric_path}.#{object_name}.#{attribute}", jmx_attribute_value)
                  end
                rescue Exception => ex
                  @logger.warn("Failed retrieving metrics for attribute #{attribute} on object #{jmx_object_name.object_name}")
                  @logger.warn(ex.message)
                end
              end
            else
              @logger.debug("No attribute to retrieve define on #{jmx_object_name.object_name}, will retrieve all")
              jmx_object_name.attributes.each_key do |attribute|
                begin
                  jmx_attribute_value = jmx_object_name.send(attribute)
                  if jmx_attribute_value.instance_of? Java::JavaxManagementOpenmbean::CompositeDataSupport
                    @logger.debug('The jmx value is a composite_data one')
                    jmx_attribute_value.each do |jmx_attribute_value_composite|
                      @logger.debug("Get jmx value #{jmx_attribute_value[jmx_attribute_value_composite]} for attribute #{jmx_object_name.attributes[attribute]}.#{jmx_attribute_value_composite} to #{jmx_object_name.object_name}")
                      send_event_to_queue(queue, thread_hash_conf['host'], "#{base_metric_path}.#{object_name}.#{jmx_object_name.attributes[attribute]}.#{jmx_attribute_value_composite}", jmx_attribute_value[jmx_attribute_value_composite])
                    end
                  else
                    @logger.debug("Get jmx value #{jmx_attribute_value} for attribute #{jmx_object_name.attributes[attribute]} to #{jmx_object_name.object_name}")
                    send_event_to_queue(queue, thread_hash_conf['host'], "#{base_metric_path}.#{object_name}.#{jmx_object_name.attributes[attribute]}", jmx_attribute_value)
                  end
                rescue Exception => ex
                  @logger.warn("Failed retrieving metrics for attribute #{attribute} on object #{jmx_object_name.object_name}")
                  @logger.warn(ex.message)
                end
              end
            end
          end
        else
          @logger.warn("No jmx object found for #{query['object_name']}")
        end
      end
      jmx_connection.close
    rescue Exception => ex
      @logger.error(ex.message)
      @logger.error(ex.backtrace.join("\n"))
    end
  end
end