class CloudWatchLogger::Client::AWS_SDK::DeliveryThread

Public Class Methods

new(credentials, log_group_name, log_stream_name, opts = {}) click to toggle source
Calls superclass method
# File lib/cloudwatchlogger/client/aws_sdk/threaded.rb, line 38
def initialize(credentials, log_group_name, log_stream_name, opts = {})
  opts[:open_timeout] = opts[:open_timeout] || 120
  opts[:read_timeout] = opts[:read_timeout] || 120
  @credentials = credentials
  @log_group_name = log_group_name
  @log_stream_name = log_stream_name
  @opts = opts

  @queue = Queue.new
  @exiting = false

  super do
    loop do
      connect!(opts) if @client.nil?

      message_object = @queue.pop
      break if message_object == :__delivery_thread_exit_signal__

      begin
        event = {
          log_group_name: @log_group_name,
          log_stream_name: @log_stream_name,
          log_events: [log_event(message_object)]
        }
        event[:sequence_token] = @sequence_token if @sequence_token
        response = @client.put_log_events(event)
        unless response.rejected_log_events_info.nil?
          raise CloudWatchLogger::LogEventRejected
        end
        @sequence_token = response.next_sequence_token
      rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException => err
        @sequence_token = err.message.split(' ').last
        retry
      end
    end
  end

  at_exit do
    exit!
    join
  end
end

Public Instance Methods

connect!(opts = {}) click to toggle source
# File lib/cloudwatchlogger/client/aws_sdk/threaded.rb, line 92
def connect!(opts = {})
  args = { http_open_timeout: opts[:open_timeout], http_read_timeout: opts[:read_timeout] }
  args[:logger] = @opts[:logger] if @opts[:logger]
  args[:region] = @opts[:region] if @opts[:region]
  args.merge!( @credentials.key?(:access_key_id) ? { access_key_id: @credentials[:access_key_id], secret_access_key: @credentials[:secret_access_key] } : {} )

  @client = Aws::CloudWatchLogs::Client.new(args)
  begin
    @client.create_log_stream(
      log_group_name: @log_group_name,
      log_stream_name: @log_stream_name
    )
  rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException
    @client.create_log_group(
      log_group_name: @log_group_name
    )
    retry
  rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException,
    Aws::CloudWatchLogs::Errors::AccessDeniedException
  end
end
deliver(message) click to toggle source

Pushes a message onto the internal queue

# File lib/cloudwatchlogger/client/aws_sdk/threaded.rb, line 88
def deliver(message)
  @queue.push(message)
end
exit!() click to toggle source

Signals the queue that we’re exiting

# File lib/cloudwatchlogger/client/aws_sdk/threaded.rb, line 82
def exit!
  @exiting = true
  @queue.push :__delivery_thread_exit_signal__
end
log_event(message_object) click to toggle source
# File lib/cloudwatchlogger/client/aws_sdk/threaded.rb, line 114
def log_event(message_object)
  timestamp = (Time.now.utc.to_f.round(3) * 1000).to_i
  message = message_object

  if message_object.is_a?(Hash) && %i[epoch_time message].all?{ |s| message_object.key?(s) }
    timestamp = message_object[:epoch_time]
    message = message_object[:message]
  end

  { timestamp: timestamp, message: message }
end