class Fluent::Plugin::TimestreamOutput

rubocop: disable Metrics/ClassLength Fluent plugin for Amazon Timestream

Constants

VALID_TIME_UNIT

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_timestream.rb, line 55
def configure(conf)
  super
  options = credential_options
  options[:region] = @region if @region
  options[:endpoint] = @endpoint if @endpoint
  options[:ssl_verify_peer] = @ssl_verify_peer
  @client = Aws::TimestreamWrite::Client.new(options)

  @database = ENV['AWS_TIMESTREAM_DATABASE'] if @database.nil?
  @table = ENV['AWS_TIMESTREAM_TABLE'] if @table.nil?
  validate_time_unit
end
create_timestream_dimension(key, value) click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 105
def create_timestream_dimension(key, value)
  value = value.to_s

  # Timestream does not accept empty string.
  # Ignore this dimension.
  return nil if value.empty?

  {
    dimension_value_type: 'VARCHAR',
    name: key,
    value: value
  }
end
create_timestream_dimensions_and_measure(record) click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 133
def create_timestream_dimensions_and_measure(record)
  measure = {}
  dimensions = record.each_with_object([]) do |(k, v), result|
    if @target_measure && k == @target_measure[:name]
      measure = create_timestream_measure(k, v)
      next
    end
    dimension = create_timestream_dimension(k, v)
    result.push(dimension) unless dimension.nil?
  end
  return [dimensions, measure]
end
create_timestream_measure(key, value) click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 119
def create_timestream_measure(key, value)
  value = value.to_s

  # Timestream does not accept empty string.
  # By raising error, ignore entire record.
  raise EmptyValueError, key if value.empty?

  {
    name: key,
    value: value,
    type: @target_measure[:type]
  }
end
create_timestream_record(dimensions, time, measure) click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 92
def create_timestream_record(dimensions, time, measure)
  raise NoDimensionsError if dimensions.empty?
  measure = { name: '-', value: '-', type: 'VARCHAR' } if measure.empty?
  {
    dimensions: dimensions,
    time: time.to_s,
    time_unit: @time_unit,
    measure_name: measure[:name],
    measure_value: measure[:value],
    measure_value_type: measure[:type]
  }
end
create_timestream_records(chunk) click to toggle source

rubocop:disable Metrics/MethodLength

# File lib/fluent/plugin/out_timestream.rb, line 147
def create_timestream_records(chunk)
  timestream_records = []
  chunk.each do |time, record|
    time = record.delete(@time_key) unless @time_key.nil?
    dimensions, measure = create_timestream_dimensions_and_measure(record)
    timestream_records.push(create_timestream_record(dimensions, time, measure))
  rescue EmptyValueError, NoDimensionsError => e
    log.warn("ignored record due to (#{e})")
    log.debug("ignored record details: #{record}")
    next
  end

  timestream_records
end
credential_options() click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 68
def credential_options
  if @aws_key_id && @aws_sec_key
    {
      access_key_id: @aws_key_id,
      secret_access_key: @aws_sec_key
    }
  else
    {}
  end
end
format(_tag, time, record) click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 88
def format(_tag, time, record)
  [time, record].to_msgpack
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 84
def formatted_to_msgpack_binary
  true
end
validate_time_unit() click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 79
def validate_time_unit
  return if VALID_TIME_UNIT.include?(@time_unit)
  raise Fluent::ConfigError, "Invalid time_unit: #{@time_unit}"
end
write(chunk) click to toggle source

rubocop:enable Metrics/MethodLength

# File lib/fluent/plugin/out_timestream.rb, line 163
def write(chunk)
  records = create_timestream_records(chunk)
  log.info("read #{records.length} records from chunk")
  write_records(records)
end
write_records(records) click to toggle source
# File lib/fluent/plugin/out_timestream.rb, line 169
def write_records(records)
  return if records.empty?
  @client.write_records(
    database_name: @database,
    table_name: @table,
    records: records
  )
rescue Aws::TimestreamWrite::Errors::RejectedRecordsException => e
  log.error(e.rejected_records)
end