class Fluent::DynamoDBOutput

Constants

BATCHWRITE_CONTENT_SIZE_LIMIT
BATCHWRITE_ITEM_LIMIT

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dynamodb.rb, line 17
def initialize
  super
  require 'aws-sdk'
  require 'msgpack'
  require 'time'
  require 'uuidtools'
end

Public Instance Methods

batch_put_records(records) click to toggle source
# File lib/fluent/plugin/out_dynamodb.rb, line 128
def batch_put_records(records)
  @dynamo_db.batch_write_item(request_items: { @dynamo_db_table => records })
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dynamodb.rb, line 34
def configure(conf)
  super

  @timef = TimeFormatter.new(@time_format, @localtime)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_dynamodb.rb, line 89
def format(tag, time, record)
  if !record.key?(@hash_key.attribute_name)
    record[@hash_key.attribute_name] = UUIDTools::UUID.timestamp_create.to_s
  end
  match_type!(@hash_key, record)

  formatted_time = @timef.format(time)
  if @range_key
    if !record.key?(@range_key.attribute_name)
      record[@range_key.attribute_name] = formatted_time
    end
    match_type!(@range_key, record)
  end
  record['time'] = formatted_time

  record.to_msgpack
end
match_type!(key, record) click to toggle source
# File lib/fluent/plugin/out_dynamodb.rb, line 79
def match_type!(key, record)
  if key.key_type == "NUMBER"
    potential_value = record[key.attribute_name].to_i
    if potential_value == 0
      log.fatal "Failed attempt to cast hash_key to Integer."
    end
    record[key.attribute_name] = potential_value
  end
end
restart_session(options) click to toggle source
# File lib/fluent/plugin/out_dynamodb.rb, line 66
def restart_session(options)
  @dynamo_db = Aws::DynamoDB::Client.new(options)
  @resource = Aws::DynamoDB::Resource.new(client: @dynamo_db)

end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_dynamodb.rb, line 40
def start
  options = {}
  if @aws_key_id && @aws_sec_key
    options[:access_key_id] = @aws_key_id
    options[:secret_access_key] = @aws_sec_key
  end
  options[:region] = @dynamo_db_region if @dynamo_db_region
  options[:endpoint] = @dynamo_db_endpoint
  options[:proxy_uri] = @proxy_uri if @proxy_uri

  detach_multi_process do
    super

    begin
      restart_session(options)
      valid_table(@dynamo_db_table)
    rescue ConfigError => e
      log.fatal "ConfigError: Please check your configuration, then restart fluentd. '#{e}'"
      exit!
    rescue Exception => e
      log.fatal "UnknownError: '#{e}'"
      exit!
    end
  end
end
valid_table(table_name) click to toggle source
# File lib/fluent/plugin/out_dynamodb.rb, line 72
def valid_table(table_name)
  table = @resource.table(table_name)
  @hash_key = table.key_schema.select{|e| e.key_type == "HASH" }.first
  range_key_candidate = table.key_schema.select{|e| e.key_type == "RANGE" }
  @range_key = range_key_candidate.first if range_key_candidate
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_dynamodb.rb, line 107
def write(chunk)
  batch_size = 0
  batch_records = []
  chunk.msgpack_each {|record|
    batch_records << {
      put_request: {
        item: record
      }
    }
    batch_size += record.to_json.length # FIXME: heuristic
    if batch_records.size >= BATCHWRITE_ITEM_LIMIT || batch_size >= BATCHWRITE_CONTENT_SIZE_LIMIT
      batch_put_records(batch_records)
      batch_records.clear
      batch_size = 0
    end
  }
  unless batch_records.empty?
    batch_put_records(batch_records)
  end
end