class LogStash::Outputs::CloudWatchLogs

This output lets you send log data to AWS CloudWatch Logs service

Constants

MAX_BACKOFF_IN_SECOND

Backoff up to 64 seconds upon failure

MAX_BATCH_COUNT
MAX_BATCH_SIZE
MAX_DISTANCE_BETWEEN_EVENTS
MESSAGE
MIN_BUFFER_DURATION
MIN_DELAY
PER_EVENT_OVERHEAD
TIMESTAMP

Constants

Attributes

buffer[R]

Only accessed by tests

cwl[RW]
cwl_cfg[RW]
last_flush[RW]
sequence_token[RW]

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/cloudwatchlogs.rb, line 143
def close
  @logger.info("Going to clean up resources")
  @buffer.close
  @publisher.join
  @cwl = nil
end
flush(events) click to toggle source
# File lib/logstash/outputs/cloudwatchlogs.rb, line 151
def flush(events)
  return if events.nil? or events.empty?
  log_event_batches = prepare_log_events(events)
  log_event_batches.each do |log_events|
    put_log_events(log_events)
  end
end
receive(event) click to toggle source
# File lib/logstash/outputs/cloudwatchlogs.rb, line 120
def receive(event)
  return unless output?(event)
  @cwl_cfg[:log_group_name] = event.sprintf(@log_group_name)
  @cwl_cfg[:log_stream_name] = event.sprintf(@log_stream_name)

  if event == LogStash::SHUTDOWN
    @buffer.close
    @publisher.join
    @logger.info("CloudWatch Logs output plugin shutdown.")
    finished
    return
  end
  return if invalid?(event)

  if @use_codec
    @codec.encode(event)
  else
    @buffer.enq({:timestamp => event.timestamp.time.to_f*1000,
     :message => event.get(MESSAGE) })
  end
end
register() click to toggle source
# File lib/logstash/outputs/cloudwatchlogs.rb, line 81
def register
  require "aws-sdk"
  Aws.eager_autoload!
  
  @cwl_cfg = Hash.new
  @logger.info("aws_options_hash #{aws_options_hash}")
  @cwl = Aws::CloudWatchLogs::Client.new(aws_options_hash)

  if @batch_count > MAX_BATCH_COUNT
    @logger.warn(":batch_count exceeds the max number of log events. Use #{MAX_BATCH_COUNT} instead.")
    @batch_count = MAX_BATCH_COUNT
  end
  if @batch_size > MAX_BATCH_SIZE
    @logger.warn(":batch_size exceeds the max size of log events. Use #{MAX_BATCH_SIZE} instead.")
    @batch_size = MAX_BATCH_SIZE
  end
  if @buffer_duration < MIN_BUFFER_DURATION
    @logger.warn(":buffer_duration is smaller than the min value. Use #{MIN_BUFFER_DURATION} instead.")
    @buffer_duration = MIN_BUFFER_DURATION
  end
  @sequence_token = Hash.new
  @last_flush = Time.now.to_f
  @buffer = Buffer.new(
    max_batch_count: batch_count, max_batch_size: batch_size,
    buffer_duration: @buffer_duration, out_queue_size: @queue_size, logger: @logger,
    size_of_item_proc: Proc.new {|event| event[:message].bytesize + PER_EVENT_OVERHEAD})
  @publisher = Thread.new do
    @buffer.deq do |batch|
      flush(batch)
    end
  end

  if @use_codec
    @codec.on_event() {|event, payload| @buffer.enq({:timestamp => event.timestamp.time.to_f*1000,
      :message => payload})}
  end
end

Private Instance Methods

invalid?(event) click to toggle source
# File lib/logstash/outputs/cloudwatchlogs.rb, line 238
def invalid?(event)
  status = event.get(TIMESTAMP).nil? || (!@use_codec && event.get(MESSAGE).nil?)
  if status
    @logger.warn("Skipping invalid event #{event.to_hash}")
  end
  return status
end
prepare_log_events(events) click to toggle source
# File lib/logstash/outputs/cloudwatchlogs.rb, line 247
def prepare_log_events(events)
  log_events = events.sort {|e1,e2| e1[:timestamp] <=> e2[:timestamp]}
  batches = []
  if log_events[-1][:timestamp] - log_events[0][:timestamp] > MAX_DISTANCE_BETWEEN_EVENTS
    temp_batch = []
    log_events.each do |log_event|
      if temp_batch.empty? || log_event[:timestamp] - temp_batch[0][:timestamp] <= MAX_DISTANCE_BETWEEN_EVENTS
        temp_batch << log_event
      else
        batches << temp_batch
        temp_batch = []
        temp_batch << log_event
      end
    end
    if not temp_batch.empty?
      batches << temp_batch
    end
  else
    batches << log_events
  end
  batches
end
put_log_events(log_events) click to toggle source
# File lib/logstash/outputs/cloudwatchlogs.rb, line 160
def put_log_events(log_events)
  return if log_events.nil? or log_events.empty?
  # Shouldn't send two requests within MIN_DELAY
  delay = MIN_DELAY - (Time.now.to_f - @last_flush)
  sleep(delay) if delay > 0
  backoff = 1
  begin
    @logger.info("Sending #{log_events.size} events to #{@cwl_cfg[:log_group_name]}/#{@cwl_cfg[:log_stream_name]}")
    @last_flush = Time.now.to_f
    if @dry_run
      log_events.each do |event|
        puts event[:message]
      end
      return
    end
    response = @cwl.put_log_events(
        :log_group_name => @cwl_cfg[:log_group_name],
        :log_stream_name => @cwl_cfg[:log_stream_name],
        :log_events => log_events,
        :sequence_token => @sequence_token[@cwl_cfg[:log_group_name]+@cwl_cfg[:log_stream_name]]
    )
    @sequence_token[@cwl_cfg[:log_group_name]+@cwl_cfg[:log_stream_name]] = response.next_sequence_token
  rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException => e
    @logger.warn(e)
    if /sequenceToken(?:\sis)?: ([^\s]+)/ =~ e.to_s
      if $1 == 'null'
        @sequence_token[@cwl_cfg[:log_group_name]+@cwl_cfg[:log_stream_name]] = nil
      else
        @sequence_token[@cwl_cfg[:log_group_name]+@cwl_cfg[:log_stream_name]] = $1
      end
      @logger.info("Will retry with new sequence token #{@sequence_token[@cwl_cfg[:log_group_name]+@cwl_cfg[:log_stream_name]]}")
      retry
    else
      @logger.error("Cannot find sequence token from response")
    end
  rescue Aws::CloudWatchLogs::Errors::DataAlreadyAcceptedException => e
    @logger.warn(e)
    if /sequenceToken(?:\sis)?: ([^\s]+)/ =~ e.to_s
      if $1 == 'null'
        @sequence_token[@cwl_cfg[:log_group_name]+@cwl_cfg[:log_stream_name]] = nil
      else
        @sequence_token[@cwl_cfg[:log_group_name]+@cwl_cfg[:log_stream_name]] = $1
      end
      @logger.info("Data already accepted and no need to resend")
    else
      @logger.error("Cannot find sequence token from response")
    end
  rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException => e
    @logger.info("Will create log group/stream and retry")
    begin
      @cwl.create_log_group(:log_group_name => @cwl_cfg[:log_group_name])
    rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException => e
      @logger.info("Log group #{@cwl_cfg[:log_group_name]} already exists")
    rescue Exception => e
      @logger.error(e)
    end
    begin
      @cwl.create_log_stream(:log_group_name => @cwl_cfg[:log_group_name], :log_stream_name => @cwl_cfg[:log_stream_name])
    rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException => e
      @logger.info("Log stream #{@cwl_cfg[:log_stream_name]} already exists")
    rescue Exception => e
      @logger.error(e)
    end
    retry
  rescue Aws::CloudWatchLogs::Errors::InvalidParameterException => e
    # swallow exception
    @logger.error("Skip batch due to #{e}")
  rescue Exception => e
    if backoff * 2 <= MAX_BACKOFF_IN_SECOND
      backoff = backoff * 2
    end
    @logger.error("Will retry for #{e} after #{backoff} seconds")
    sleep backoff
    retry
  end
end