class LogStash::Inputs::CloudWatch

Pull events from the Amazon Web Services CloudWatch API.

To use this plugin, you must have an AWS account, and the following policy.

Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user.

A sample policy for EC2 metrics is as follows:

source,json

{

"Version": "2012-10-17",
"Statement": [
    {
        "Sid": "Stmt1444715676000",
        "Effect": "Allow",
        "Action": [
            "cloudwatch:GetMetricStatistics",
            "cloudwatch:ListMetrics"
        ],
        "Resource": "*"
    },
    {
        "Sid": "Stmt1444716576170",
        "Effect": "Allow",
        "Action": [
            "ec2:DescribeInstances"
        ],
        "Resource": "*"
    }
]

}

See aws.amazon.com/iam/ for more details on setting up AWS identities.

# Configuration Example

source, ruby

input {

cloudwatch {
  namespace => "AWS/EC2"
  metrics => [ "CPUUtilization" ]
  filters => { "tag:Group" => "API-Production" }
  region => "us-east-1"
}

}

input {

cloudwatch {
  namespace => "AWS/EBS"
  metrics => ["VolumeQueueLength"]
  filters => { "tag:Monitoring" => "Yes" }
  region => "us-east-1"
}

}

input {

cloudwatch {
  namespace => "AWS/RDS"
  metrics => ["CPUUtilization", "CPUCreditUsage"]
  filters => { "EngineName" => "mysql" } # Only supports EngineName, DatabaseClass and DBInstanceIdentifier
  region => "us-east-1"
}

}

Public Instance Methods

aws_service_endpoint(region) click to toggle source
# File lib/logstash/inputs/cloudwatch.rb, line 124
def aws_service_endpoint(region)
  { region: region }
end
filters_required?(namespace) click to toggle source
# File lib/logstash/inputs/cloudwatch.rb, line 136
def filters_required?(namespace)
  case namespace
  when 'AWS/EC2'
    false
  else
    true
  end
end
register() click to toggle source
# File lib/logstash/inputs/cloudwatch.rb, line 128
def register
  raise 'Interval needs to be higher than period' unless @interval >= @period
  raise 'Interval must be divisible by period' unless @interval % @period == 0
  raise "Filters must be defined for when using #{@namespace} namespace" if @filters.nil? && filters_required?(@namespace)

  @last_check = Time.now
end
run(queue) click to toggle source

Runs the poller to get metrics for the provided namespace

@param queue [Array] Logstash queue

# File lib/logstash/inputs/cloudwatch.rb, line 148
def run(queue)
  while !stop?
    start = Time.now

    @logger.info('Polling CloudWatch API')

    raise 'No metrics to query' unless metrics_for(@namespace).count > 0

    # For every metric
    metrics_for(@namespace).each do |metric|
      @logger.debug "Polling metric #{metric}"
      if @filters.nil?
        from_resources(queue, metric)
      else
        @logger.debug "Filters: #{aws_filters}"
        @combined ? from_filters(queue, metric) : from_resources(queue, metric)
      end
    end
    sleep_for = @interval - (Time.now - start)
    Stud.stoppable_sleep(sleep_for) { stop? } if sleep_for > 0
  end # loop
end

Private Instance Methods

aws_filters() click to toggle source

Filters used in querying the AWS SDK for resources

@return [Array]

# File lib/logstash/inputs/cloudwatch.rb, line 297
def aws_filters
  @filters.collect do |key, value|
    if @combined
      { name: key, value: value }
    else
      value = [value] unless value.is_a? Array
      { name: key, values: value }
    end
  end
end
cleanup(event) click to toggle source

Cleans up an event to remove unneeded fields and format time

@param event [Hash] Raw event

@return [Hash] Cleaned event

# File lib/logstash/inputs/cloudwatch.rb, line 231
def cleanup(event)
  event.delete :statistics
  event.delete :dimensions
  event[:start_time] = Time.parse(event[:start_time]).utc
  event[:end_time]   = Time.parse(event[:end_time]).utc
  LogStash::Util.stringify_symbols(event)
end
clients() click to toggle source

Dynamic AWS client instantiator for retrieving the proper client for the provided namespace

@return [Hash]

# File lib/logstash/inputs/cloudwatch.rb, line 243
def clients
  @clients ||= Hash.new do |client_hash, namespace|
    namespace = namespace[4..-1] if namespace[0..3] == 'AWS/'
    namespace = 'EC2' if namespace == 'EBS'
    cls = Aws.const_get(namespace)
    # TODO: Move logger configuration into mixin.
    client_hash[namespace] = cls::Client.new(aws_options_hash.merge(:logger => @logger))
  end
end
filter_options() click to toggle source
# File lib/logstash/inputs/cloudwatch.rb, line 334
def filter_options
  @filters.nil? ? {} : { :filters => aws_filters }
end
from_filters(queue, metric) click to toggle source

Gets metrics from provided filter options

@param queue [Array] Logstash queue @param metric [String] Metric name

# File lib/logstash/inputs/cloudwatch.rb, line 206
def from_filters(queue, metric)
  options = metric_options(@namespace, metric)
  options[:dimensions] = aws_filters
  @logger.debug "Dim: #{options[:dimensions]}"

  datapoints = clients['CloudWatch'].get_metric_statistics(options)
  @logger.debug "DPs: #{datapoints.data}"

  datapoints[:datapoints].each do |datapoint|
    event_hash = datapoint.to_hash.update(options)
    aws_filters.each do |dimension|
      event_hash[dimension[:name].to_sym] = dimension[:value]
    end

    event = LogStash::Event.new(cleanup(event_hash))
    decorate(event)
    queue << event
  end
end
from_resources(queue, metric) click to toggle source

Gets metrics from provided resources.

@param queue [Array] Logstash queue @param metric [String] Metric name

# File lib/logstash/inputs/cloudwatch.rb, line 177
def from_resources(queue, metric)
  # For every dimension in the metric
  resources.each_pair do |dimension, dim_resources|
    # For every resource in the dimension
    dim_resources = *dim_resources
    dim_resources.each do |resource|
      @logger.debug "Polling resource #{dimension}: #{resource}"

      options = metric_options(@namespace, metric)
      options[:dimensions] = [ { name: dimension, value: resource } ]

      datapoints = clients['CloudWatch'].get_metric_statistics(options)
      @logger.debug "DPs: #{datapoints.data}"
      # For every event in the resource
      datapoints[:datapoints].each do |datapoint|
        event_hash = datapoint.to_hash.update(options)
        event_hash[dimension.to_sym] = resource
        event = LogStash::Event.new(cleanup(event_hash))
        decorate(event)
        queue << event
      end
    end
  end
end
metric_options(namespace, metric) click to toggle source

Gets options for querying against Cloudwatch for a given metric and namespace

@param namespace [String] Namespace to query in @param metric [String] Metric to query for

@return [Hash]

# File lib/logstash/inputs/cloudwatch.rb, line 283
def metric_options(namespace, metric)
  {
    namespace: namespace,
    metric_name: metric,
    start_time: (Time.now - @interval).iso8601,
    end_time: Time.now.iso8601,
    period: @period,
    statistics: @statistics
  }
end
metrics_available() click to toggle source

Gets available metrics for a given namespace

@return [Hash]

# File lib/logstash/inputs/cloudwatch.rb, line 266
def metrics_available
  @metrics_available ||= Hash.new do |metrics_hash, namespace|
    metrics_hash[namespace] = []

    clients['CloudWatch'].list_metrics({ namespace: namespace })[:metrics].each do |metrics|
      metrics_hash[namespace].push metrics[:metric_name]
    end
    metrics_hash[namespace]
  end
end
metrics_for(namespace) click to toggle source

Gets metrics for a provided namespace based on the union of available and found metrics

@param namespace [String] Namespace to retrieve metrics for

@return [Hash]

# File lib/logstash/inputs/cloudwatch.rb, line 259
def metrics_for(namespace)
  metrics_available[namespace] & @metrics
end
resources() click to toggle source

Gets resources based on the provided namespace

@see docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/CW_Support_For_AWS.html

@return [Array]

# File lib/logstash/inputs/cloudwatch.rb, line 313
def resources
  case @namespace
    when 'AWS/EC2'
      instances = clients[@namespace].describe_instances(filter_options)[:reservations].collect do |r|
        r[:instances].collect{ |i| i[:instance_id] }
      end.flatten

    { 'InstanceId' => instances }
  when 'AWS/EBS'
    volumes = clients[@namespace].describe_volumes(filters: aws_filters)[:volumes].collect do |a|
      a[:attachments].collect{ |v| v[:volume_id] }
    end.flatten

    @logger.debug "AWS/EBS Volumes: #{volumes}"

    { 'VolumeId' => volumes }
  else
    @filters
  end
end