class Fluent::Plugin::TimestreamOutput
rubocop: disable Metrics/ClassLength Fluent
plugin for Amazon Timestream
Constants
- VALID_TIME_UNIT
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_timestream.rb, line 55 def configure(conf) super options = credential_options options[:region] = @region if @region options[:endpoint] = @endpoint if @endpoint options[:ssl_verify_peer] = @ssl_verify_peer @client = Aws::TimestreamWrite::Client.new(options) @database = ENV['AWS_TIMESTREAM_DATABASE'] if @database.nil? @table = ENV['AWS_TIMESTREAM_TABLE'] if @table.nil? validate_time_unit end
create_timestream_dimension(key, value)
click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 105 def create_timestream_dimension(key, value) value = value.to_s # Timestream does not accept empty string. # Ignore this dimension. return nil if value.empty? { dimension_value_type: 'VARCHAR', name: key, value: value } end
create_timestream_dimensions_and_measure(record)
click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 133 def create_timestream_dimensions_and_measure(record) measure = {} dimensions = record.each_with_object([]) do |(k, v), result| if @target_measure && k == @target_measure[:name] measure = create_timestream_measure(k, v) next end dimension = create_timestream_dimension(k, v) result.push(dimension) unless dimension.nil? end return [dimensions, measure] end
create_timestream_measure(key, value)
click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 119 def create_timestream_measure(key, value) value = value.to_s # Timestream does not accept empty string. # By raising error, ignore entire record. raise EmptyValueError, key if value.empty? { name: key, value: value, type: @target_measure[:type] } end
create_timestream_record(dimensions, time, measure)
click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 92 def create_timestream_record(dimensions, time, measure) raise NoDimensionsError if dimensions.empty? measure = { name: '-', value: '-', type: 'VARCHAR' } if measure.empty? { dimensions: dimensions, time: time.to_s, time_unit: @time_unit, measure_name: measure[:name], measure_value: measure[:value], measure_value_type: measure[:type] } end
create_timestream_records(chunk)
click to toggle source
rubocop:disable Metrics/MethodLength
# File lib/fluent/plugin/out_timestream.rb, line 147 def create_timestream_records(chunk) timestream_records = [] chunk.each do |time, record| time = record.delete(@time_key) unless @time_key.nil? dimensions, measure = create_timestream_dimensions_and_measure(record) timestream_records.push(create_timestream_record(dimensions, time, measure)) rescue EmptyValueError, NoDimensionsError => e log.warn("ignored record due to (#{e})") log.debug("ignored record details: #{record}") next end timestream_records end
credential_options()
click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 68 def credential_options if @aws_key_id && @aws_sec_key { access_key_id: @aws_key_id, secret_access_key: @aws_sec_key } else {} end end
format(_tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 88 def format(_tag, time, record) [time, record].to_msgpack end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 84 def formatted_to_msgpack_binary true end
validate_time_unit()
click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 79 def validate_time_unit return if VALID_TIME_UNIT.include?(@time_unit) raise Fluent::ConfigError, "Invalid time_unit: #{@time_unit}" end
write(chunk)
click to toggle source
rubocop:enable Metrics/MethodLength
# File lib/fluent/plugin/out_timestream.rb, line 163 def write(chunk) records = create_timestream_records(chunk) log.info("read #{records.length} records from chunk") write_records(records) end
write_records(records)
click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 169 def write_records(records) return if records.empty? @client.write_records( database_name: @database, table_name: @table, records: records ) rescue Aws::TimestreamWrite::Errors::RejectedRecordsException => e log.error(e.rejected_records) end