class LogStashLogger::Device::AwsStream
Constants
- DEFAULT_STREAM
Attributes
recoverable_error_codes[RW]
stream_class[RW]
aws_region[RW]
stream[RW]
Public Class Methods
new(opts)
click to toggle source
Calls superclass method
# File lib/logstash-logger/device/aws_stream.rb, line 22 def initialize(opts) super @access_key_id = opts[:aws_access_key_id] @secret_access_key = opts[:aws_secret_access_key] @aws_region = opts[:aws_region] @stream = opts[:stream] || DEFAULT_STREAM end
Public Instance Methods
close!()
click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 87 def close! @io = nil end
connect()
click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 46 def connect client_opts = {} client_opts[:credentials] = Aws::Credentials.new(@access_key_id, @secret_access_key) unless @access_key_id == nil || @secret_access_key == nil client_opts[:region] = @aws_region unless @aws_region == nil @io = self.class.stream_class.new(client_opts) end
get_response_records(resp)
click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 42 def get_response_records(resp) fail NotImplementedError end
is_successful_response(resp)
click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 38 def is_successful_response(resp) fail NotImplementedError end
put_records(records)
click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 34 def put_records(records) fail NotImplementedError end
transform_message(message)
click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 30 def transform_message(message) fail NotImplementedError end
with_connection() { || ... }
click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 53 def with_connection connect unless connected? yield rescue => e log_error(e) log_warning("giving up") close(flush: false) end
write_batch(messages, group = nil)
click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 62 def write_batch(messages, group = nil) records = messages.map{ |m| transform_message(m) } with_connection do resp = put_records(records) # Put any failed records back into the buffer if !is_successful_response(resp) get_response_records(resp).each_with_index do |record, index| if self.class.recoverable_error_codes.include?(record.error_code) log_warning("Failed to post record using #{self.class.stream_class.name} with error: #{record.error_code} #{record.error_message}") log_warning("Retrying") write(records[index][:data]) elsif !record.error_code.nil? && record.error_code != '' log_error("Failed to post record using #{self.class.stream_class.name} with error: #{record.error_code} #{record.error_message}") end end end end end
write_one(message)
click to toggle source
# File lib/logstash-logger/device/aws_stream.rb, line 83 def write_one(message) write_batch([message]) end