class Fluent::Plugin::CloudwatchLogsInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 34
def initialize
  super

  require 'aws-sdk-cloudwatchlogs'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 40
def configure(conf)
  compat_parameters_convert(conf, :parser)
  super
  configure_parser(conf)
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 76
def shutdown
  @finished = true
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 46
def start
  super
  options = {}
  options[:region] = @region if @region
  options[:endpoint] = @endpoint if @endpoint
  options[:http_proxy] = @http_proxy if @http_proxy

  if @aws_use_sts
    Aws.config[:region] = options[:region]
    options[:credentials] = Aws::AssumeRoleCredentials.new(
      role_arn: @aws_sts_role_arn,
      role_session_name: @aws_sts_session_name
    )
  else
    options[:credentials] = Aws::Credentials.new(@aws_key_id, @aws_sec_key) if @aws_key_id && @aws_sec_key
  end

  @logs = Aws::CloudWatchLogs::Client.new(options)

  @finished = false
  thread_create(:in_cloudwatch_logs_runner, &method(:run))

  @json_handler = case @json_handler
                  when :yajl
                    Yajl
                  when :json
                    JSON
                  end
end

Private Instance Methods

configure_parser(conf) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 82
def configure_parser(conf)
  if conf['format']
    @parser = parser_create
  end
end
describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 173
def describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil)
  request = {
    log_group_name: @log_group_name
  }
  request[:next_token] = next_token if next_token
  request[:log_stream_name_prefix] = log_stream_name_prefix
  response = @logs.describe_log_streams(request)
  if log_streams
    log_streams.concat(response.log_streams)
  else
    log_streams = response.log_streams
  end
  if response.next_token
    log_streams = describe_log_streams(log_stream_name_prefix, log_streams, response.next_token)
  end
  log_streams
end
emit(stream, event) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 138
def emit(stream, event)
  if @parser
    @parser.parse(event.message) {|time,record|
      if @use_aws_timestamp
        time = (event.timestamp / 1000).floor
      end
      router.emit(@tag, time, record)
    }
  else
    time = (event.timestamp / 1000).floor
    begin
      record = @json_handler.load(event.message)
    rescue JSON::ParserError, Yajl::ParseError # Catch parser errors
      # log.debug "Non-JSON message encountered"
      record = { "message" => event.message }
    end
    router.emit(@tag, time, record)
  end
end
get_events(log_stream_name) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 158
def get_events(log_stream_name)
  request = {
    log_group_name: @log_group_name,
    log_stream_name: log_stream_name
  }
  log_next_token = next_token(log_stream_name)
  request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty?
  response = @logs.get_log_events(request)
  if valid_next_token(log_next_token, response.next_forward_token)
    store_next_token(response.next_forward_token, log_stream_name)
  end

  response.events
end
get_todays_date() click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 195
def get_todays_date
  Date.today.strftime("%Y/%m/%d")
end
get_yesterdays_date() click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 199
def get_yesterdays_date
  (Date.today - 1).strftime("%Y/%m/%d")
end
next_token(log_stream_name) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 93
def next_token(log_stream_name)
  return nil unless File.exist?(state_file_for(log_stream_name))
  File.read(state_file_for(log_stream_name)).chomp
end
run() click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 104
def run
  @next_fetch_time = Time.now

  until @finished
    if Time.now > @next_fetch_time
      @next_fetch_time += @fetch_interval

      if @use_log_stream_name_prefix || @use_todays_log_stream
        log_stream_name_prefix = @use_todays_log_stream ? get_todays_date : @log_stream_name
        begin
          log_streams = describe_log_streams(log_stream_name_prefix)
          log_streams.concat(describe_log_streams(get_yesterdays_date)) if @use_todays_log_stream
          log_streams.each do |log_stream|
            log_stream_name = log_stream.log_stream_name
            events = get_events(log_stream_name)
            events.each do |event|
              emit(log_stream_name, event)
            end
          end
        rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException
          log.warn "'#{@log_stream_name}' prefixed log stream(s) are not found"
          next
        end
      else
        events = get_events(@log_stream_name)
        events.each do |event|
          emit(log_stream_name, event)
        end
      end
    end
    sleep 1
  end
end
state_file_for(log_stream_name) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 88
def state_file_for(log_stream_name)
  return "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" if log_stream_name
  return @state_file
end
store_next_token(token, log_stream_name = nil) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 98
def store_next_token(token, log_stream_name = nil)
  File.open(state_file_for(log_stream_name), 'w') do |f|
    f.write token
  end
end
valid_next_token(prev_token, next_token) click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 191
def valid_next_token(prev_token, next_token)
  next_token && prev_token != next_token.chomp
end