class Fluent::Plugin::CloudWatchPutOutput

Attributes

cloudwatch[R]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cloudwatch_put.rb, line 91
def configure(conf)
  super

  unless !!@metric_name ^ @key_as_metric_name
    raise Fluent::ConfigError, "'Either 'metric_name'' or 'key_as_metric_name' must be set"
  end

  @send_all_key = false
  if @value_key.size == 1 && @value_key[0] == "*"
    @send_all_key = true
  end

  placeholder_params = "namespace=#{@namespace}/metric_name=#{@metric_name}/unit=#{@unit}"
  placeholder_validate!(:cloudwatch_put, placeholder_params)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_cloudwatch_put.rb, line 107
def multi_workers_ready?
  true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cloudwatch_put.rb, line 111
def start
  super

  options = setup_credentials
  options[:region] = @region if @region
  options[:http_proxy] = @proxy_uri if @proxy_uri
  log.on_trace do
    options[:http_wire_trace] = true
    options[:logger] = log
  end

  @cloudwatch = Aws::CloudWatch::Client.new(options)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_cloudwatch_put.rb, line 125
      def write(chunk)
#        if @use_statistic_sets
#          metric_data = build_statistic_metric_data(chunk)
#        else
         metric_data = build_metric_data(chunk)
#        end

        namespace = extract_placeholders(@namespace, chunk.metadata)
        log.debug "Put metric to #{namespace}, count=#{metric_data.size}"
        log.on_trace do
          log.trace metric_data.to_json
        end
        @cloudwatch.put_metric_data({
          namespace: namespace,
          metric_data: metric_data,
        })
      end

Private Instance Methods

base_metric_data(meta) click to toggle source
# File lib/fluent/plugin/out_cloudwatch_put.rb, line 145
def base_metric_data(meta)
  {
    unit: extract_placeholders(@unit, meta),
    storage_resolution: @storage_resolution,
  }
end
build_metric_data(chunk) click to toggle source
# File lib/fluent/plugin/out_cloudwatch_put.rb, line 152
def build_metric_data(chunk)
  meta = chunk.metadata
  metric_data = []
  chunk.msgpack_each do |(timestamp, record)|
    record.each do |k, v|
      next unless @value_key.include?(k) || @send_all_key
      timestamp_aws = record['timestamp'] || timestamp
      metric_name = @key_as_metric_name ? k : extract_placeholders(@metric_name, meta)

      unless timestamp_aws.is_a? Fixnum
        log.error "Invalid timestamp for metric_name #{metric_name} timestamp #{timestamp_aws} needs to be a number"
        timestamp_aws = timestamp
      end

      metric_data << {
        metric_name: metric_name,
        unit: extract_placeholders(@unit, meta),
        storage_resolution: @storage_resolution,
        dimensions: record['dimensions'].map do |dk, dv|
          {
            name: dk,
            value: dv
          }
        end,
        value: v.to_f,
        timestamp: Time.at(timestamp_aws)
      }
    end
  end
  metric_data
end
setup_credentials() click to toggle source

Credential Configs from fluent-plugin-s3 Apache License, version 2.0

# File lib/fluent/plugin/out_cloudwatch_put.rb, line 218
def setup_credentials
  options = {}
  credentials_options = {}
  case
  when @aws_key_id && @aws_sec_key
    options[:access_key_id] = @aws_key_id
    options[:secret_access_key] = @aws_sec_key
  when @assume_role_credentials
    c = @assume_role_credentials
    credentials_options[:role_arn] = c.role_arn
    credentials_options[:role_session_name] = c.role_session_name
    credentials_options[:policy] = c.policy if c.policy
    credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
    credentials_options[:external_id] = c.external_id if c.external_id
    if @region
      credentials_options[:client] = Aws::STS::Client.new(region: @region)
    end
    options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options)
  when @instance_profile_credentials
    c = @instance_profile_credentials
    credentials_options[:retries] = c.retries if c.retries
    credentials_options[:ip_address] = c.ip_address if c.ip_address
    credentials_options[:port] = c.port if c.port
    credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout
    credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout
    if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"]
      options[:credentials] = Aws::ECSCredentials.new(credentials_options)
    else
      options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
    end
  when @shared_credentials
    c = @shared_credentials
    credentials_options[:path] = c.path if c.path
    credentials_options[:profile_name] = c.profile_name if c.profile_name
    options[:credentials] = Aws::SharedCredentials.new(credentials_options)
  when @aws_iam_retries
    log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead")
    credentials_options[:retries] = @aws_iam_retries
    if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"]
      options[:credentials] = Aws::ECSCredentials.new(credentials_options)
    else
      options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
    end
  else
    # Use default credentials
    # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html
  end
  options
end