class LogStash::Outputs::CloudWatchLogs
This output lets you send log data to AWS CloudWatch Logs service
Constants
- MAX_BACKOFF_IN_SECOND
Backoff up to 64 seconds upon failure
- MAX_BATCH_COUNT
- MAX_BATCH_SIZE
- MAX_DISTANCE_BETWEEN_EVENTS
- MESSAGE
- MIN_BUFFER_DURATION
- MIN_DELAY
- PER_EVENT_OVERHEAD
- TIMESTAMP
Constants
Attributes
buffer[R]
Only accessed by tests
cwl[RW]
cwl_cfg[RW]
last_flush[RW]
sequence_token[RW]
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/cloudwatchlogs.rb, line 143 def close @logger.info("Going to clean up resources") @buffer.close @publisher.join @cwl = nil end
flush(events)
click to toggle source
# File lib/logstash/outputs/cloudwatchlogs.rb, line 151 def flush(events) return if events.nil? or events.empty? log_event_batches = prepare_log_events(events) log_event_batches.each do |log_events| put_log_events(log_events) end end
receive(event)
click to toggle source
# File lib/logstash/outputs/cloudwatchlogs.rb, line 120 def receive(event) return unless output?(event) @cwl_cfg[:log_group_name] = event.sprintf(@log_group_name) @cwl_cfg[:log_stream_name] = event.sprintf(@log_stream_name) if event == LogStash::SHUTDOWN @buffer.close @publisher.join @logger.info("CloudWatch Logs output plugin shutdown.") finished return end return if invalid?(event) if @use_codec @codec.encode(event) else @buffer.enq({:timestamp => event.timestamp.time.to_f*1000, :message => event.get(MESSAGE) }) end end
register()
click to toggle source
# File lib/logstash/outputs/cloudwatchlogs.rb, line 81 def register require "aws-sdk" Aws.eager_autoload! @cwl_cfg = Hash.new @logger.info("aws_options_hash #{aws_options_hash}") @cwl = Aws::CloudWatchLogs::Client.new(aws_options_hash) if @batch_count > MAX_BATCH_COUNT @logger.warn(":batch_count exceeds the max number of log events. Use #{MAX_BATCH_COUNT} instead.") @batch_count = MAX_BATCH_COUNT end if @batch_size > MAX_BATCH_SIZE @logger.warn(":batch_size exceeds the max size of log events. Use #{MAX_BATCH_SIZE} instead.") @batch_size = MAX_BATCH_SIZE end if @buffer_duration < MIN_BUFFER_DURATION @logger.warn(":buffer_duration is smaller than the min value. Use #{MIN_BUFFER_DURATION} instead.") @buffer_duration = MIN_BUFFER_DURATION end @sequence_token = Hash.new @last_flush = Time.now.to_f @buffer = Buffer.new( max_batch_count: batch_count, max_batch_size: batch_size, buffer_duration: @buffer_duration, out_queue_size: @queue_size, logger: @logger, size_of_item_proc: Proc.new {|event| event[:message].bytesize + PER_EVENT_OVERHEAD}) @publisher = Thread.new do @buffer.deq do |batch| flush(batch) end end if @use_codec @codec.on_event() {|event, payload| @buffer.enq({:timestamp => event.timestamp.time.to_f*1000, :message => payload})} end end
Private Instance Methods
invalid?(event)
click to toggle source
# File lib/logstash/outputs/cloudwatchlogs.rb, line 238 def invalid?(event) status = event.get(TIMESTAMP).nil? || (!@use_codec && event.get(MESSAGE).nil?) if status @logger.warn("Skipping invalid event #{event.to_hash}") end return status end
prepare_log_events(events)
click to toggle source
# File lib/logstash/outputs/cloudwatchlogs.rb, line 247 def prepare_log_events(events) log_events = events.sort {|e1,e2| e1[:timestamp] <=> e2[:timestamp]} batches = [] if log_events[-1][:timestamp] - log_events[0][:timestamp] > MAX_DISTANCE_BETWEEN_EVENTS temp_batch = [] log_events.each do |log_event| if temp_batch.empty? || log_event[:timestamp] - temp_batch[0][:timestamp] <= MAX_DISTANCE_BETWEEN_EVENTS temp_batch << log_event else batches << temp_batch temp_batch = [] temp_batch << log_event end end if not temp_batch.empty? batches << temp_batch end else batches << log_events end batches end
put_log_events(log_events)
click to toggle source
# File lib/logstash/outputs/cloudwatchlogs.rb, line 160 def put_log_events(log_events) return if log_events.nil? or log_events.empty? # Shouldn't send two requests within MIN_DELAY delay = MIN_DELAY - (Time.now.to_f - @last_flush) sleep(delay) if delay > 0 backoff = 1 begin @logger.info("Sending #{log_events.size} events to #{@cwl_cfg[:log_group_name]}/#{@cwl_cfg[:log_stream_name]}") @last_flush = Time.now.to_f if @dry_run log_events.each do |event| puts event[:message] end return end response = @cwl.put_log_events( :log_group_name => @cwl_cfg[:log_group_name], :log_stream_name => @cwl_cfg[:log_stream_name], :log_events => log_events, :sequence_token => @sequence_token[@cwl_cfg[:log_group_name]+@cwl_cfg[:log_stream_name]] ) @sequence_token[@cwl_cfg[:log_group_name]+@cwl_cfg[:log_stream_name]] = response.next_sequence_token rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException => e @logger.warn(e) if /sequenceToken(?:\sis)?: ([^\s]+)/ =~ e.to_s if $1 == 'null' @sequence_token[@cwl_cfg[:log_group_name]+@cwl_cfg[:log_stream_name]] = nil else @sequence_token[@cwl_cfg[:log_group_name]+@cwl_cfg[:log_stream_name]] = $1 end @logger.info("Will retry with new sequence token #{@sequence_token[@cwl_cfg[:log_group_name]+@cwl_cfg[:log_stream_name]]}") retry else @logger.error("Cannot find sequence token from response") end rescue Aws::CloudWatchLogs::Errors::DataAlreadyAcceptedException => e @logger.warn(e) if /sequenceToken(?:\sis)?: ([^\s]+)/ =~ e.to_s if $1 == 'null' @sequence_token[@cwl_cfg[:log_group_name]+@cwl_cfg[:log_stream_name]] = nil else @sequence_token[@cwl_cfg[:log_group_name]+@cwl_cfg[:log_stream_name]] = $1 end @logger.info("Data already accepted and no need to resend") else @logger.error("Cannot find sequence token from response") end rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException => e @logger.info("Will create log group/stream and retry") begin @cwl.create_log_group(:log_group_name => @cwl_cfg[:log_group_name]) rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException => e @logger.info("Log group #{@cwl_cfg[:log_group_name]} already exists") rescue Exception => e @logger.error(e) end begin @cwl.create_log_stream(:log_group_name => @cwl_cfg[:log_group_name], :log_stream_name => @cwl_cfg[:log_stream_name]) rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException => e @logger.info("Log stream #{@cwl_cfg[:log_stream_name]} already exists") rescue Exception => e @logger.error(e) end retry rescue Aws::CloudWatchLogs::Errors::InvalidParameterException => e # swallow exception @logger.error("Skip batch due to #{e}") rescue Exception => e if backoff * 2 <= MAX_BACKOFF_IN_SECOND backoff = backoff * 2 end @logger.error("Will retry for #{e} after #{backoff} seconds") sleep backoff retry end end