class LogStashLogger::Device::AwsStream

Constants

DEFAULT_STREAM

Attributes

recoverable_error_codes[RW]
stream_class[RW]
aws_region[RW]
stream[RW]

Public Class Methods

new(opts) click to toggle source
Calls superclass method
# File lib/logstash-logger/device/aws_stream.rb, line 22
def initialize(opts)
  super
  @access_key_id = opts[:aws_access_key_id]
  @secret_access_key = opts[:aws_secret_access_key]
  @aws_region = opts[:aws_region]
  @stream = opts[:stream] || DEFAULT_STREAM
end

Public Instance Methods

close!() click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 87
def close!
  @io = nil
end
connect() click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 46
def connect
  client_opts = {}
  client_opts[:credentials] = Aws::Credentials.new(@access_key_id, @secret_access_key) unless @access_key_id == nil || @secret_access_key == nil
  client_opts[:region] = @aws_region unless @aws_region == nil
  @io = self.class.stream_class.new(client_opts)
end
get_response_records(resp) click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 42
def get_response_records(resp)
  fail NotImplementedError
end
is_successful_response(resp) click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 38
def is_successful_response(resp)
  fail NotImplementedError
end
put_records(records) click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 34
def put_records(records)
  fail NotImplementedError
end
transform_message(message) click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 30
def transform_message(message)
  fail NotImplementedError
end
with_connection() { || ... } click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 53
def with_connection
  connect unless connected?
  yield
rescue => e
  log_error(e)
  log_warning("giving up")
  close(flush: false)
end
write_batch(messages, group = nil) click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 62
def write_batch(messages, group = nil)
  records = messages.map{ |m| transform_message(m) }

  with_connection do
    resp = put_records(records)

    # Put any failed records back into the buffer
    if !is_successful_response(resp)
      get_response_records(resp).each_with_index do |record, index|
        if self.class.recoverable_error_codes.include?(record.error_code)
          log_warning("Failed to post record using #{self.class.stream_class.name} with error: #{record.error_code} #{record.error_message}")
          log_warning("Retrying")
          write(records[index][:data])
        elsif !record.error_code.nil? && record.error_code != ''
          log_error("Failed to post record using #{self.class.stream_class.name} with error: #{record.error_code} #{record.error_message}")
        end
      end
    end
  end
end
write_one(message) click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 83
def write_one(message)
  write_batch([message])
end