class Fluent::Plugin::CloudwatchLogsInput

Constants

DEFAULT_STORAGE_TYPE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 65
def initialize
  super

  @parser = nil
  require 'aws-sdk-cloudwatchlogs'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 72
def configure(conf)
  compat_parameters_convert(conf, :parser)
  super
  configure_parser(conf)

  @start_time = (Time.strptime(@start_time, @time_range_format).to_f * 1000).floor if @start_time
  @end_time = (Time.strptime(@end_time, @time_range_format).to_f * 1000).floor if @end_time
  if @start_time && @end_time && (@end_time < @start_time)
    raise Fluent::ConfigError, "end_time(#{@end_time}) should be greater than start_time(#{@start_time})."
  end
  @next_token_storage = storage_create(usage: 'store_next_tokens', conf: config, default_type: DEFAULT_STORAGE_TYPE)
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 142
def shutdown
  @finished = true
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 85
def start
  super
  options = {}
  options[:region] = @region if @region
  options[:endpoint] = @endpoint if @endpoint
  options[:ssl_verify_peer] = @ssl_verify_peer
  options[:http_proxy] = @http_proxy if @http_proxy

  if @aws_use_sts
    Aws.config[:region] = options[:region]
    credentials_options = {
      role_arn: @aws_sts_role_arn,
      role_session_name: @aws_sts_session_name,
      external_id: @aws_sts_external_id,
      policy: @aws_sts_policy,
      duration_seconds: @aws_sts_duration_seconds
    }
    credentials_options[:sts_endpoint_url] = @aws_sts_endpoint_url if @aws_sts_endpoint_url
    if @region and @aws_sts_endpoint_url
      credentials_options[:client] = Aws::STS::Client.new(:region => @region, endpoint: @aws_sts_endpoint_url)
    elsif @region
      credentials_options[:client] = Aws::STS::Client.new(:region => @region)
    end
    options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options)
  elsif @web_identity_credentials
    c = @web_identity_credentials
    credentials_options = {}
    credentials_options[:role_arn] = c.role_arn
    credentials_options[:role_session_name] = c.role_session_name
    credentials_options[:web_identity_token_file] = c.web_identity_token_file
    credentials_options[:policy] = c.policy if c.policy
    credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
    if @region
      credentials_options[:client] = Aws::STS::Client.new(:region => @region)
    end
    options[:credentials] = Aws::AssumeRoleWebIdentityCredentials.new(credentials_options)
  elsif @aws_ecs_authentication
    # collect AWS credential from ECS relative uri ENV variable
    aws_container_credentials_relative_uri = ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"]
    options[:credentials] = Aws::ECSCredentials.new({credential_path: aws_container_credentials_relative_uri}).credentials
  else
    options[:credentials] = Aws::Credentials.new(@aws_key_id, @aws_sec_key) if @aws_key_id && @aws_sec_key
  end

  @logs = Aws::CloudWatchLogs::Client.new(options)

  @finished = false
  thread_create(:in_cloudwatch_logs_runner, &method(:run))

  @json_handler = case @json_handler
                  when :yajl
                    Yajl
                  when :json
                    JSON
                  end
end
state_key_for(log_stream_name, log_group_name = nil) click to toggle source

No private for testing

# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 148
def state_key_for(log_stream_name, log_group_name = nil)
  if log_group_name && log_stream_name
    "#{@state_file}_#{log_group_name.gsub(File::SEPARATOR, '-')}_#{log_stream_name.gsub(File::SEPARATOR, '-')}"
  elsif log_stream_name
    "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}"
  else
    @state_file
  end
end

Private Instance Methods

configure_parser(conf) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 159
def configure_parser(conf)
  if conf['format']
    @parser = parser_create
  elsif parser_config = conf.elements('parse').first
    @parser = parser_create(conf: parser_config)
  end
end
describe_log_groups(log_group_name_prefix, log_groups = nil, next_token = nil) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 339
def describe_log_groups(log_group_name_prefix, log_groups = nil, next_token = nil)
  request = {
    log_group_name_prefix: log_group_name_prefix
  }
  request[:next_token] = next_token if next_token
  response = @logs.describe_log_groups(request)
  if log_groups
    log_groups.concat(response.log_groups)
  else
    log_groups = response.log_groups
  end
  if response.next_token
    log_groups = describe_log_groups(log_group_name_prefix, log_groups, response.next_token)
  end
  log_groups
end
describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil, log_group_name=nil) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 306
def describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil, log_group_name=nil)
  throttling_handler('describe_log_streams') do
    request = {
      log_group_name: log_group_name != nil ? log_group_name : @log_group_name
    }
    request[:next_token] = next_token if next_token
    request[:log_stream_name_prefix] = log_stream_name_prefix if log_stream_name_prefix
    response = @logs.describe_log_streams(request)
    if log_streams
      log_streams.concat(response.log_streams)
    else
      log_streams = response.log_streams
    end
    if response.next_token
      log_streams = describe_log_streams(log_stream_name_prefix, log_streams, response.next_token, log_group_name)
    end
    log_streams
  end
end
emit(group, stream, event, metadata) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 242
def emit(group, stream, event, metadata)
  if @parser
    @parser.parse(event.message) {|time,record|
      if @use_aws_timestamp
        time = (event.timestamp / 1000).floor
      end
      if @add_log_group_name
        record[@log_group_name_key] = group
      end
      unless metadata.empty?
        record.merge!("metadata" => metadata)
      end
      router.emit(@tag, time, record)
    }
  else
    time = (event.timestamp / 1000).floor
    begin
      record = @json_handler.load(event.message)
      if @add_log_group_name
        record[@log_group_name_key] = group
      end
      unless metadata.empty?
        record.merge!("metadata" => metadata)
      end
      router.emit(@tag, time, record)
    rescue JSON::ParserError, Yajl::ParseError => error # Catch parser errors
      log.error "Invalid JSON encountered while parsing event.message"
      router.emit_error_event(@tag, time, { message: event.message }, error)
    end
  end
end
get_events(log_group_name, log_stream_name) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 274
def get_events(log_group_name, log_stream_name)
  throttling_handler('get_log_events') do
    request = {
      log_group_name: log_group_name,
      log_stream_name: log_stream_name
    }
    request.merge!(start_time: @start_time) if @start_time
    request.merge!(end_time: @end_time) if @end_time
    if @use_log_group_name_prefix
      log_next_token = next_token(log_stream_name, log_group_name)
    else
      log_next_token = next_token(log_stream_name)
    end
    request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty?
    request[:start_from_head] = true if read_from_head?(log_next_token)
    response = @logs.get_log_events(request)
    if valid_next_token(log_next_token, response.next_forward_token)
      if @use_log_group_name_prefix
        store_next_token(response.next_forward_token, log_stream_name, log_group_name)
      else
        store_next_token(response.next_forward_token, log_stream_name)
      end
    end

    response.events
  end
end
get_todays_date() click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 360
def get_todays_date
  Date.today.strftime("%Y/%m/%d")
end
get_yesterdays_date() click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 364
def get_yesterdays_date
  (Date.today - 1).strftime("%Y/%m/%d")
end
migrate_state_file_to_storage(log_stream_name) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 167
def migrate_state_file_to_storage(log_stream_name)
  @next_token_storage.put(:"#{state_key_for(log_stream_name)}", File.read(state_key_for(log_stream_name)).chomp)
  File.delete(state_key_for(log_stream_name))
end
next_token(log_stream_name, log_group_name = nil) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 172
def next_token(log_stream_name, log_group_name = nil)
  if @next_token_storage.persistent && File.exist?(state_key_for(log_stream_name))
    migrate_state_file_to_storage(log_stream_name)
  end
  @next_token_storage.get(:"#{state_key_for(log_stream_name, log_group_name)}")
end
read_from_head?(next_token) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 302
def read_from_head?(next_token)
  (!next_token.nil? && !next_token.empty?) || @start_time || @end_time
end
run() click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 183
def run
  @next_fetch_time = Time.now

  until @finished
    if Time.now > @next_fetch_time
      @next_fetch_time += @fetch_interval

      if @use_log_group_name_prefix
        log_group_names = describe_log_groups(@log_group_name).map{|log_group|
          log_group.log_group_name
        }
      else
        log_group_names = [@log_group_name]
      end
      log_group_names.each do |log_group_name|
        if @use_log_stream_name_prefix || @use_todays_log_stream
          log_stream_name_prefix = @use_todays_log_stream ? get_todays_date : @log_stream_name
          begin
            log_streams = describe_log_streams(log_stream_name_prefix, nil, nil, log_group_name)
            log_streams.concat(describe_log_streams(get_yesterdays_date, nil, nil, log_group_name)) if @use_todays_log_stream
            log_streams.each do |log_stream|
              log_stream_name = log_stream.log_stream_name
              events = get_events(log_group_name, log_stream_name)
              metadata = if @include_metadata
                           {
                             "log_stream_name" => log_stream_name,
                             "log_group_name" => log_group_name
                           }
                         else
                           {}
                         end
              events.each do |event|
                emit(log_group_name, log_stream_name, event, metadata)
              end
            end
          rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException
            log.warn "'#{@log_stream_name}' prefixed log stream(s) are not found"
            next
          end
        else
          events = get_events(log_group_name, @log_stream_name)
          metadata = if @include_metadata
                       {
                         "log_stream_name" => @log_stream_name,
                         "log_group_name" => @log_group_name
                       }
                     else
                       {}
                     end
          events.each do |event|
            emit(log_group_name, log_stream_name, event, metadata)
          end
        end
      end
    end
    sleep 1
  end
end
store_next_token(token, log_stream_name = nil, log_group_name = nil) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 179
def store_next_token(token, log_stream_name = nil, log_group_name = nil)
  @next_token_storage.put(:"#{state_key_for(log_stream_name, log_group_name)}", token)
end
throttling_handler(method_name) { || ... } click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 326
def throttling_handler(method_name)
  yield
rescue Aws::CloudWatchLogs::Errors::ThrottlingException => err
  if throttling_retry_seconds
    log.warn "ThrottlingException #{method_name}. Waiting #{throttling_retry_seconds} seconds to retry."
    sleep throttling_retry_seconds

    throttling_handler(method_name) { yield }
  else
    raise err
  end
end
valid_next_token(prev_token, next_token) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 356
def valid_next_token(prev_token, next_token)
  next_token && prev_token != next_token.chomp
end