class LogStash::Inputs::CloudWatch
Pull events from the Amazon Web Services CloudWatch
API.
To use this plugin, you must have an AWS account, and the following policy.
Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user.
A sample policy for EC2 metrics is as follows:
- source,json
-
{
"Version": "2012-10-17", "Statement": [ { "Sid": "Stmt1444715676000", "Effect": "Allow", "Action": [ "cloudwatch:GetMetricStatistics", "cloudwatch:ListMetrics" ], "Resource": "*" }, { "Sid": "Stmt1444716576170", "Effect": "Allow", "Action": [ "ec2:DescribeInstances" ], "Resource": "*" } ]
}
See aws.amazon.com/iam/ for more details on setting up AWS identities.
# Configuration Example
- source, ruby
-
input {
cloudwatch { namespace => "AWS/EC2" metrics => [ "CPUUtilization" ] filters => { "tag:Group" => "API-Production" } region => "us-east-1" }
}
input {
cloudwatch { namespace => "AWS/EBS" metrics => ["VolumeQueueLength"] filters => { "tag:Monitoring" => "Yes" } region => "us-east-1" }
}
input {
cloudwatch { namespace => "AWS/RDS" metrics => ["CPUUtilization", "CPUCreditUsage"] filters => { "EngineName" => "mysql" } # Only supports EngineName, DatabaseClass and DBInstanceIdentifier region => "us-east-1" }
}
Public Instance Methods
# File lib/logstash/inputs/cloudwatch.rb, line 124 def aws_service_endpoint(region) { region: region } end
# File lib/logstash/inputs/cloudwatch.rb, line 136 def filters_required?(namespace) case namespace when 'AWS/EC2' false else true end end
# File lib/logstash/inputs/cloudwatch.rb, line 128 def register raise 'Interval needs to be higher than period' unless @interval >= @period raise 'Interval must be divisible by period' unless @interval % @period == 0 raise "Filters must be defined for when using #{@namespace} namespace" if @filters.nil? && filters_required?(@namespace) @last_check = Time.now end
Runs the poller to get metrics for the provided namespace
@param queue [Array] Logstash queue
# File lib/logstash/inputs/cloudwatch.rb, line 148 def run(queue) while !stop? start = Time.now @logger.info('Polling CloudWatch API') raise 'No metrics to query' unless metrics_for(@namespace).count > 0 # For every metric metrics_for(@namespace).each do |metric| @logger.debug "Polling metric #{metric}" if @filters.nil? from_resources(queue, metric) else @logger.debug "Filters: #{aws_filters}" @combined ? from_filters(queue, metric) : from_resources(queue, metric) end end sleep_for = @interval - (Time.now - start) Stud.stoppable_sleep(sleep_for) { stop? } if sleep_for > 0 end # loop end
Private Instance Methods
Filters used in querying the AWS SDK for resources
@return [Array]
# File lib/logstash/inputs/cloudwatch.rb, line 297 def aws_filters @filters.collect do |key, value| if @combined { name: key, value: value } else value = [value] unless value.is_a? Array { name: key, values: value } end end end
Cleans up an event to remove unneeded fields and format time
@param event [Hash] Raw event
@return [Hash] Cleaned event
# File lib/logstash/inputs/cloudwatch.rb, line 231 def cleanup(event) event.delete :statistics event.delete :dimensions event[:start_time] = Time.parse(event[:start_time]).utc event[:end_time] = Time.parse(event[:end_time]).utc LogStash::Util.stringify_symbols(event) end
Dynamic AWS client instantiator for retrieving the proper client for the provided namespace
@return [Hash]
# File lib/logstash/inputs/cloudwatch.rb, line 243 def clients @clients ||= Hash.new do |client_hash, namespace| namespace = namespace[4..-1] if namespace[0..3] == 'AWS/' namespace = 'EC2' if namespace == 'EBS' cls = Aws.const_get(namespace) # TODO: Move logger configuration into mixin. client_hash[namespace] = cls::Client.new(aws_options_hash.merge(:logger => @logger)) end end
# File lib/logstash/inputs/cloudwatch.rb, line 334 def filter_options @filters.nil? ? {} : { :filters => aws_filters } end
Gets metrics from provided filter options
@param queue [Array] Logstash queue @param metric [String] Metric name
# File lib/logstash/inputs/cloudwatch.rb, line 206 def from_filters(queue, metric) options = metric_options(@namespace, metric) options[:dimensions] = aws_filters @logger.debug "Dim: #{options[:dimensions]}" datapoints = clients['CloudWatch'].get_metric_statistics(options) @logger.debug "DPs: #{datapoints.data}" datapoints[:datapoints].each do |datapoint| event_hash = datapoint.to_hash.update(options) aws_filters.each do |dimension| event_hash[dimension[:name].to_sym] = dimension[:value] end event = LogStash::Event.new(cleanup(event_hash)) decorate(event) queue << event end end
Gets metrics from provided resources.
@param queue [Array] Logstash queue @param metric [String] Metric name
# File lib/logstash/inputs/cloudwatch.rb, line 177 def from_resources(queue, metric) # For every dimension in the metric resources.each_pair do |dimension, dim_resources| # For every resource in the dimension dim_resources = *dim_resources dim_resources.each do |resource| @logger.debug "Polling resource #{dimension}: #{resource}" options = metric_options(@namespace, metric) options[:dimensions] = [ { name: dimension, value: resource } ] datapoints = clients['CloudWatch'].get_metric_statistics(options) @logger.debug "DPs: #{datapoints.data}" # For every event in the resource datapoints[:datapoints].each do |datapoint| event_hash = datapoint.to_hash.update(options) event_hash[dimension.to_sym] = resource event = LogStash::Event.new(cleanup(event_hash)) decorate(event) queue << event end end end end
Gets options for querying against Cloudwatch for a given metric and namespace
@param namespace [String] Namespace to query in @param metric [String] Metric to query for
@return [Hash]
# File lib/logstash/inputs/cloudwatch.rb, line 283 def metric_options(namespace, metric) { namespace: namespace, metric_name: metric, start_time: (Time.now - @interval).iso8601, end_time: Time.now.iso8601, period: @period, statistics: @statistics } end
Gets available metrics for a given namespace
@return [Hash]
# File lib/logstash/inputs/cloudwatch.rb, line 266 def metrics_available @metrics_available ||= Hash.new do |metrics_hash, namespace| metrics_hash[namespace] = [] clients['CloudWatch'].list_metrics({ namespace: namespace })[:metrics].each do |metrics| metrics_hash[namespace].push metrics[:metric_name] end metrics_hash[namespace] end end
Gets metrics for a provided namespace based on the union of available and found metrics
@param namespace [String] Namespace to retrieve metrics for
@return [Hash]
# File lib/logstash/inputs/cloudwatch.rb, line 259 def metrics_for(namespace) metrics_available[namespace] & @metrics end
Gets resources based on the provided namespace
@see docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/CW_Support_For_AWS.html
@return [Array]
# File lib/logstash/inputs/cloudwatch.rb, line 313 def resources case @namespace when 'AWS/EC2' instances = clients[@namespace].describe_instances(filter_options)[:reservations].collect do |r| r[:instances].collect{ |i| i[:instance_id] } end.flatten { 'InstanceId' => instances } when 'AWS/EBS' volumes = clients[@namespace].describe_volumes(filters: aws_filters)[:volumes].collect do |a| a[:attachments].collect{ |v| v[:volume_id] } end.flatten @logger.debug "AWS/EBS Volumes: #{volumes}" { 'VolumeId' => volumes } else @filters end end