class Fluent::Plugin::CloudwatchLogsInput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 34 def initialize super require 'aws-sdk-cloudwatchlogs' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 40 def configure(conf) compat_parameters_convert(conf, :parser) super configure_parser(conf) end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 76 def shutdown @finished = true super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 46 def start super options = {} options[:region] = @region if @region options[:endpoint] = @endpoint if @endpoint options[:http_proxy] = @http_proxy if @http_proxy if @aws_use_sts Aws.config[:region] = options[:region] options[:credentials] = Aws::AssumeRoleCredentials.new( role_arn: @aws_sts_role_arn, role_session_name: @aws_sts_session_name ) else options[:credentials] = Aws::Credentials.new(@aws_key_id, @aws_sec_key) if @aws_key_id && @aws_sec_key end @logs = Aws::CloudWatchLogs::Client.new(options) @finished = false thread_create(:in_cloudwatch_logs_runner, &method(:run)) @json_handler = case @json_handler when :yajl Yajl when :json JSON end end
Private Instance Methods
configure_parser(conf)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 82 def configure_parser(conf) if conf['format'] @parser = parser_create end end
describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 173 def describe_log_streams(log_stream_name_prefix, log_streams = nil, next_token = nil) request = { log_group_name: @log_group_name } request[:next_token] = next_token if next_token request[:log_stream_name_prefix] = log_stream_name_prefix response = @logs.describe_log_streams(request) if log_streams log_streams.concat(response.log_streams) else log_streams = response.log_streams end if response.next_token log_streams = describe_log_streams(log_stream_name_prefix, log_streams, response.next_token) end log_streams end
emit(stream, event)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 138 def emit(stream, event) if @parser @parser.parse(event.message) {|time,record| if @use_aws_timestamp time = (event.timestamp / 1000).floor end router.emit(@tag, time, record) } else time = (event.timestamp / 1000).floor begin record = @json_handler.load(event.message) rescue JSON::ParserError, Yajl::ParseError # Catch parser errors # log.debug "Non-JSON message encountered" record = { "message" => event.message } end router.emit(@tag, time, record) end end
get_events(log_stream_name)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 158 def get_events(log_stream_name) request = { log_group_name: @log_group_name, log_stream_name: log_stream_name } log_next_token = next_token(log_stream_name) request[:next_token] = log_next_token if !log_next_token.nil? && !log_next_token.empty? response = @logs.get_log_events(request) if valid_next_token(log_next_token, response.next_forward_token) store_next_token(response.next_forward_token, log_stream_name) end response.events end
get_todays_date()
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 195 def get_todays_date Date.today.strftime("%Y/%m/%d") end
get_yesterdays_date()
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 199 def get_yesterdays_date (Date.today - 1).strftime("%Y/%m/%d") end
next_token(log_stream_name)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 93 def next_token(log_stream_name) return nil unless File.exist?(state_file_for(log_stream_name)) File.read(state_file_for(log_stream_name)).chomp end
run()
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 104 def run @next_fetch_time = Time.now until @finished if Time.now > @next_fetch_time @next_fetch_time += @fetch_interval if @use_log_stream_name_prefix || @use_todays_log_stream log_stream_name_prefix = @use_todays_log_stream ? get_todays_date : @log_stream_name begin log_streams = describe_log_streams(log_stream_name_prefix) log_streams.concat(describe_log_streams(get_yesterdays_date)) if @use_todays_log_stream log_streams.each do |log_stream| log_stream_name = log_stream.log_stream_name events = get_events(log_stream_name) events.each do |event| emit(log_stream_name, event) end end rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException log.warn "'#{@log_stream_name}' prefixed log stream(s) are not found" next end else events = get_events(@log_stream_name) events.each do |event| emit(log_stream_name, event) end end end sleep 1 end end
state_file_for(log_stream_name)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 88 def state_file_for(log_stream_name) return "#{@state_file}_#{log_stream_name.gsub(File::SEPARATOR, '-')}" if log_stream_name return @state_file end
store_next_token(token, log_stream_name = nil)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 98 def store_next_token(token, log_stream_name = nil) File.open(state_file_for(log_stream_name), 'w') do |f| f.write token end end
valid_next_token(prev_token, next_token)
click to toggle source
# File lib/fluent/plugin/in_cloudwatch_logs.rb, line 191 def valid_next_token(prev_token, next_token) next_token && prev_token != next_token.chomp end