class LambdaWrap::DynamoTable

The DynamoTable class simplifies Creation, Updating, and Destroying Dynamo DB Tables. @since 1.0

Public Class Methods

new(options) click to toggle source

Sets up the DynamoTable for the Dynamo DB Manager. Preloading the configuration in the constructor.

@param [Hash] options The configuration for the DynamoDB Table. @option options [String] :table_name The name of the DynamoDB Table. A “Base Name” can be used here where the

environment name can be appended upon deployment.

@option options [Array<Hash>] :attribute_definitions ([{ attribute_name: 'Id', attribute_type: 'S' }]) An array of

attributes that describe the key schema for the table and indexes. The Hash must have symbols: :attribute_name &
:attribute_type. Please see AWS Documentation for the {http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataModel.html
Data Model}.

@option options [Array<Hash>] :key_schema ([{ attribute_name: 'Id', key_type: 'HASH' }]) Specifies the attributes

that make up the primary key for a table or an index. The attributes in key_schema must also be defined in the
AttributeDefinitions array. Please see AWS Documentation for the {http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataModel.html
Data Model}.

Each element in the array must be composed of:
* <tt>:attribute_name</tt> - The name of this key attribute.
* <tt>:key_type</tt> - The role that the key attribute will assume:
  * <tt>HASH</tt> - partition key
  * <tt>RANGE</tt> - sort key

The partition key of an item is also known as its hash attribute. The term "hash attribute" derives from
DynamoDB's usage of an internal hash function to evenly distribute data items across partitions, based on their
partition key values.

The sort key of an item is also known as its range attribute. The term "range attribute" derives from the way
DynamoDB stores items with the same partition key physically close together, in sorted order by the sort key
value.

For a simple primary key (partition key), you must provide exactly one element with a <tt>KeyType</tt> of
<tt>HASH</tt>.

For a composite primary key (partition key and sort key), you must provide exactly two elements, in this order:
The first element must have a <tt>KeyType</tt> of <tt>HASH</tt>, and the second element must have a
<tt>KeyType</tt> of <tt>RANGE</tt>.

For more information, see {http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.html#WorkingWithTables.primary.key
Specifying the Primary Key} in the <em>Amazon DynamoDB Developer Guide</em>.

@option options [Integer] :read_capacity_units (1) The maximum number of strongly consistent reads consumed per

second before DynamoDB returns a <tt>ThrottlingException</tt>. Must be at least 1. For current minimum and
maximum provisioned throughput values, see {http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Limits.html
Limits} in the <em>Amazon DynamoDB Developer Guide</em>.

@option options [Integer] :write_capacity_units (1) The maximum number of writes consumed per second before

DynamoDB returns a <tt>ThrottlingException</tt>. Must be at least 1. For current minimum and maximum
provisioned throughput values, see {http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Limits.html
Limits} in the <em>Amazon DynamoDB Developer Guide</em>.

@option options [Array<Hash>] :local_secondary_indexes ([]) One or more local secondary indexes (the maximum is

five) to be created on the table. Each index is scoped to a given partition key value. There is a 10 GB size
limit per partition key value; otherwise, the size of a local secondary index is unconstrained.

Each element in the array must be a Hash with these symbols:
* <tt>:index_name</tt> - The name of the local secondary index. Must be unique only for this table.
* <tt>:key_schema</tt> - Specifies the key schema for the local secondary index. The key schema must begin with
  the same partition key as the table.
* <tt>:projection</tt> - Specifies attributes that are copied (projected) from the table into the index. These
  are in addition to the primary key attributes and index key attributes, which are automatically projected. Each
  attribute specification is composed of:
  * <tt>:projection_type</tt> - One of the following:
    * <tt>KEYS_ONLY</tt> - Only the index and primary keys are projected into the index.
    * <tt>INCLUDE</tt> - Only the specified table attributes are projected into the index. The list of projected
      attributes are in <tt>non_key_attributes</tt>.
    * <tt>ALL</tt> - All of the table attributes are projected into the index.
  * <tt>:non_key_attributes</tt> - A list of one or more non-key attribute names that are projected into the
    secondary index. The total count of attributes provided in NonKeyAttributes, summed across all of the
    secondary indexes, must not exceed 20. If you project the same attribute into two different indexes, this
    counts as two distinct attributes when determining the total.

@option options [Array<Hash>] :global_secondary_indexes ([]) One or more global secondary indexes (the maximum is

five) to be created on the table. Each global secondary index (Hash) in the array includes the following:
* <tt>:index_name</tt> - The name of the global secondary index. Must be unique only for this table.
* <tt>:key_schema</tt> - Specifies the key schema for the global secondary index.
* <tt>:projection</tt> - Specifies attributes that are copied (projected) from the table into the index. These
  are in addition to the primary key attributes and index key attributes, which are automatically projected. Each
  attribute specification is composed of:
  * <tt>:projection_type</tt> - One of the following:
    * <tt>KEYS_ONLY</tt> - Only the index and primary keys are projected into the index.
    * <tt>INCLUDE</tt> - Only the specified table attributes are projected into the index. The list of projected
      attributes are in <tt>NonKeyAttributes</tt>.
    * <tt>ALL</tt> - All of the table attributes are projected into the index.
  * <tt>non_key_attributes</tt> - A list of one or more non-key attribute names that are projected into the
    secondary index. The total count of attributes provided in NonKeyAttributes, summed across all of the
    secondary indexes, must not exceed 20. If you project the same attribute into two different indexes, this
    counts as two distinct attributes when determining the total.
* <tt>:provisioned_throughput</tt> - The provisioned throughput settings for the global secondary index,
  consisting of read and write capacity units.

@option options [Boolean] :append_environment_on_deploy (false) Option to append the name of the environment to

the table name upon deployment and teardown. DynamoDB Tables cannot shard data in a similar manner as how Lambda
aliases and API Gateway Environments work. This option is supposed to help the user with naming tables instead
of managing the environment names on their own.
# File lib/lambda_wrap/dynamo_db_manager.rb, line 99
def initialize(options)
  default_options = { append_environment_on_deploy: false, read_capacity_units: 1, write_capacity_units: 1,
                      local_secondary_indexes: nil, global_secondary_indexes: nil,
                      attribute_definitions: [{ attribute_name: 'Id', attribute_type: 'S' }],
                      key_schema: [{ attribute_name: 'Id', key_type: 'HASH' }] }

  options_with_defaults = options.reverse_merge(default_options)

  @table_name = options_with_defaults[:table_name]
  raise ArgumentError, ':table_name is required.' unless @table_name

  @attribute_definitions = options_with_defaults[:attribute_definitions]
  @key_schema = options_with_defaults[:key_schema]

  # Verify that all of key_schema is defined in attribute_definitions
  defined_in_attribute_definitions_guard(@key_schema)

  @read_capacity_units = options_with_defaults[:read_capacity_units]
  @write_capacity_units = options_with_defaults[:write_capacity_units]
  provisioned_throughput_guard(read_capacity_units: @read_capacity_units,
                               write_capacity_units: @write_capacity_units)

  unless @read_capacity_units >= 1 && @write_capacity_units >= 1 && (@read_capacity_units.is_a? Integer) &&
         (@write_capacity_units.is_a? Integer)
    raise ArgumentExecption, 'Read and Write Capacity must be positive integers.'
  end

  @local_secondary_indexes = options_with_defaults[:local_secondary_indexes]

  if @local_secondary_indexes && @local_secondary_indexes.length > 5
    raise ArgumentError, 'Can only have 5 LocalSecondaryIndexes per table!'
  end
  if @local_secondary_indexes && !@local_secondary_indexes.empty?
    @local_secondary_indexes.each { |lsindex| defined_in_attribute_definitions_guard(lsindex[:key_schema]) }
  end

  @global_secondary_indexes = options_with_defaults[:global_secondary_indexes]

  if @global_secondary_indexes && @global_secondary_indexes.length > 5
    raise ArgumentError, 'Can only have 5 GlobalSecondaryIndexes per table1'
  end
  if @global_secondary_indexes && !@global_secondary_indexes.empty?
    @global_secondary_indexes.each do |gsindex|
      defined_in_attribute_definitions_guard(gsindex[:key_schema])
      provisioned_throughput_guard(gsindex[:provisioned_throughput])
    end
  end

  @append_environment_on_deploy = options_with_defaults[:append_environment_on_deploy]
end

Public Instance Methods

delete(client, region = 'AWS_REGION') click to toggle source

Deletes all DynamoDB tables that are prefixed with the @table_name specified in the constructor. This is an attempt to tear down all DynamoTables that were deployed with the environment name appended.

@param client [Aws::DynamoDB::Client] Client to use with SDK. Should be passed in by the API class. @param region [String] AWS Region string. Should be passed in by the API class.

Calls superclass method LambdaWrap::AwsService#delete
# File lib/lambda_wrap/dynamo_db_manager.rb, line 197
def delete(client, region = 'AWS_REGION')
  super
  puts "Deleting all tables with prefix: #{@table_name}."
  table_names = retrieve_prefixed_tables(@table_name)
  table_names.each { |table_name| delete_table(table_name) }
  puts "Deleted #{table_names.length} tables."
  table_names.length
end
deploy(environment_options, client, region = 'AWS_REGION') click to toggle source

Deploys the DynamoDB Table to the target environment. If the @append_environment_on_deploy option is set, the table_name will be appended with a hyphen and the environment name. This will attempt to Create or Update with the parameters specified from the constructor. This may take a LONG time for it will wait for any new indexes to be available.

@param environment_options [LambdaWrap::Environment] Target environment to deploy. @param client [Aws::DynamoDB::Client] Client to use with SDK. Should be passed in by the API class. @param region [String] AWS Region string. Should be passed in by the API class.

Calls superclass method LambdaWrap::AwsService#deploy
# File lib/lambda_wrap/dynamo_db_manager.rb, line 158
def deploy(environment_options, client, region = 'AWS_REGION')
  super

  puts "Deploying Table: #{@table_name} to Environment: #{environment_options.name}"

  full_table_name = @table_name + (@append_environment_on_deploy ? "-#{environment_options.name}" : '')

  table_details = retrieve_table_details(full_table_name)

  if table_details.nil?
    create_table(full_table_name)
  else
    wait_until_table_is_available(full_table_name) if table_details[:table_status] != 'ACTIVE'
    update_table(full_table_name, table_details)
  end

  puts "Dynamo Table #{full_table_name} is now available."
  full_table_name
end
teardown(environment_options, client, region = 'AWS_REGION') click to toggle source

Deletes the DynamoDB table specified by the table_name and the Environment name (if append_environment_on_deploy) was specified. Otherwise just deletes the table.

@param environment_options [LambdaWrap::Environment] Target environment to teardown @param client [Aws::DynamoDB::Client] Client to use with SDK. Should be passed in by the API class. @param region [String] AWS Region string. Should be passed in by the API class.

Calls superclass method LambdaWrap::AwsService#teardown
# File lib/lambda_wrap/dynamo_db_manager.rb, line 184
def teardown(environment_options, client, region = 'AWS_REGION')
  super
  puts "Tearingdown Table: #{@table_name} from Environment: #{environment_options.name}"
  full_table_name = @table_name + (@append_environment_on_deploy ? "-#{environment_options.name}" : '')
  delete_table(full_table_name)
  full_table_name
end
to_s() click to toggle source
Calls superclass method
# File lib/lambda_wrap/dynamo_db_manager.rb, line 206
def to_s
  return @table_name if @table_name && @table_name.is_a?(String)
  super
end

Private Instance Methods

build_global_index_deletes_array(current_global_indexes) click to toggle source
# File lib/lambda_wrap/dynamo_db_manager.rb, line 323
def build_global_index_deletes_array(current_global_indexes)
  return [] if current_global_indexes.empty?
  current_index_names = current_global_indexes.map(&:index_name)
  target_index_names = @global_secondary_indexes.map { |gsindex| gsindex[:index_name] }
  current_index_names - target_index_names
end
build_global_index_updates_array(current_global_indexes) click to toggle source

Looks through the list current of Global Secondary Indexes and builds an array if the Provisioned Throughput in the intended Indexes are higher than the current Indexes.

# File lib/lambda_wrap/dynamo_db_manager.rb, line 341
def build_global_index_updates_array(current_global_indexes)
  indexes_to_update = []
  return indexes_to_update if current_global_indexes.empty?
  current_global_indexes.each do |current_index|
    @global_secondary_indexes.each do |target_index|
      # Find the same named index
      next unless target_index[:index_name] == current_index[:index_name]
      # Skip unless a different ProvisionedThroughput is specified
      break unless (target_index[:provisioned_throughput][:read_capacity_units] !=
                    current_index.provisioned_throughput.read_capacity_units) ||
                   (target_index[:provisioned_throughput][:write_capacity_units] !=
                    current_index.provisioned_throughput.write_capacity_units)
      indexes_to_update << { index_name: target_index[:index_name],
                             provisioned_throughput: target_index[:provisioned_throughput] }
    end
  end
  puts indexes_to_update
  indexes_to_update
end
build_new_global_indexes_array(current_global_indexes) click to toggle source
# File lib/lambda_wrap/dynamo_db_manager.rb, line 379
def build_new_global_indexes_array(current_global_indexes)
  return [] if !@global_secondary_indexes || @global_secondary_indexes.empty?

  index_names_to_create = @global_secondary_indexes.map { |gsindex| gsindex[:index_name] } -
                          current_global_indexes.map(&:index_name)

  @global_secondary_indexes.select do |gsindex|
    index_names_to_create.include?(gsindex[:index_name])
  end
end
create_global_indexes(full_table_name, new_global_secondary_indexes) click to toggle source
# File lib/lambda_wrap/dynamo_db_manager.rb, line 390
def create_global_indexes(full_table_name, new_global_secondary_indexes)
  puts "Creating new Global Indexes for Table: #{full_table_name}"
  puts(new_global_secondary_indexes.map { |index| index[:index_name].to_s })
  options = {
    table_name: full_table_name,
    global_secondary_index_updates: new_global_secondary_indexes.map { |index| { create: index } }
  }
  @client.update_table(options)
end
create_table(full_table_name) click to toggle source
# File lib/lambda_wrap/dynamo_db_manager.rb, line 400
def create_table(full_table_name)
  puts "Creating table #{full_table_name}..."
  options = {
    table_name: full_table_name, attribute_definitions: @attribute_definitions,
    key_schema: @key_schema,
    provisioned_throughput: { read_capacity_units: @read_capacity_units,
                              write_capacity_units: @write_capacity_units },
    local_secondary_indexes: @local_secondary_indexes,
    global_secondary_indexes: @global_secondary_indexes
  }
  @client.create_table(options)
  # Wait 60 seconds because "DescribeTable uses an eventually consistent query"
  puts 'Sleeping for 60 seconds...'
  Kernel.sleep(60)

  # Wait for up to 2m.
  wait_until_table_is_available(full_table_name, 5, 24)
end
defined_in_attribute_definitions_guard(key_schema) click to toggle source
# File lib/lambda_wrap/dynamo_db_manager.rb, line 457
def defined_in_attribute_definitions_guard(key_schema)
  if Set.new(key_schema.map { |item| item[:attribute_name] })
        .subset?(Set.new(@attribute_definitions.map { |item| item[:attribute_name] }))
    return true
  end
  raise ArgumentError, 'Not all keys in the key_schema are defined in the attribute_definitions!'
end
delete_global_index(full_table_name, index_to_delete) click to toggle source
# File lib/lambda_wrap/dynamo_db_manager.rb, line 330
def delete_global_index(full_table_name, index_to_delete)
  puts "Deleting Global Secondary Index: #{index_to_delete} from Table: #{full_table_name}"
  options = {
    table_name: full_table_name,
    global_secondary_index_updates: [{ delete: { index_name: index_to_delete } }]
  }
  @client.update_table(options)
end
delete_table(full_table_name) click to toggle source
# File lib/lambda_wrap/dynamo_db_manager.rb, line 419
def delete_table(full_table_name)
  puts "Trying to delete Table: #{full_table_name}"
  table_details = retrieve_table_details(full_table_name)
  if table_details.nil?
    puts 'Table did not exist. Nothing to delete.'
  else
    # Wait up to 30m
    wait_until_table_available(full_table_name, 5, 360) if table_details.table_status != 'ACTIVE'
    options = {
      table_name: full_table_name
    }
    @client.delete_table(options)
  end
end
provisioned_throughput_guard(provisioned_throughput) click to toggle source
# File lib/lambda_wrap/dynamo_db_manager.rb, line 465
def provisioned_throughput_guard(provisioned_throughput)
  if provisioned_throughput[:read_capacity_units] >= 1 && provisioned_throughput[:write_capacity_units] >= 1 &&
     provisioned_throughput[:read_capacity_units].is_a?(Integer) &&
     provisioned_throughput[:write_capacity_units].is_a?(Integer)
    return true
  end
  raise ArgumentError, 'Read and Write Capacity for all ProvisionedThroughput must be positive integers.'
end
retrieve_all_table_names() click to toggle source
# File lib/lambda_wrap/dynamo_db_manager.rb, line 438
def retrieve_all_table_names
  tables = []
  response = nil
  loop do
    options = {
      limit: 100
    }
    unless !response || response.last_evaluated_table_name.nil? || response.last_evaluated_table_name.empty?
      options[:exclusive_start_table_name] = response.last_evaluated_table_name
    end
    response = @client.list_tables(options)
    tables.concat(response.table_names)
    if response.table_names.empty? || response.last_evaluated_table_name.nil? ||
       response.last_evaluated_table_name.empty?
      return tables
    end
  end
end
retrieve_prefixed_tables(prefix) click to toggle source
# File lib/lambda_wrap/dynamo_db_manager.rb, line 434
def retrieve_prefixed_tables(prefix)
  retrieve_all_table_names.select { |name| name =~ /#{Regexp.quote(prefix)}[a-zA-Z0-9_\-.]*/ }
end
retrieve_table_details(full_table_name) click to toggle source
# File lib/lambda_wrap/dynamo_db_manager.rb, line 213
def retrieve_table_details(full_table_name)
  table_details = nil
  begin
    options = {
      table_name: full_table_name
    }
    table_details = @client.describe_table(options).table
  rescue Aws::DynamoDB::Errors::ResourceNotFoundException
    puts "Table #{full_table_name} does not exist."
  end
  table_details
end
update_global_indexes(full_table_name, global_secondary_index_updates) click to toggle source
# File lib/lambda_wrap/dynamo_db_manager.rb, line 361
    def update_global_indexes(full_table_name, global_secondary_index_updates)
      puts "Updating Global Indexes for Table: #{full_table_name}"
      puts(
        global_secondary_index_updates.map do |index|
          "#{index[:index_name]} -\
\tRead: #{index[:provisioned_throughput][:read_capacity_units]},\
\tWrite: #{index[:provisioned_throughput][:write_capacity_units]}"
        end
      )

      options = {
        table_name: full_table_name,
        global_secondary_index_updates: global_secondary_index_updates.map { |index| { update: index } }
      }

      @client.update_table(options)
    end
update_provisioned_throughput(full_table_name, old_read, old_write) click to toggle source
# File lib/lambda_wrap/dynamo_db_manager.rb, line 311
def update_provisioned_throughput(full_table_name, old_read, old_write)
  puts "Updating Provisioned Throughtput for #{full_table_name}"
  puts "Setting Read Capacity Units From: #{old_read} To: #{@read_capacity_units}"
  puts "Setting Write Capacty Units From: #{old_write} To: #{@write_capacity_units}"
  options = {
    table_name: full_table_name,
    provisioned_throughput: { read_capacity_units: @read_capacity_units,
                              write_capacity_units: @write_capacity_units }
  }
  @client.update_table(options)
end
update_table(full_table_name, table_details) click to toggle source

Updates the Dynamo Table. You can only perform one of the following update operations at once:

  • Modify the provisioned throughput settings of the table.

  • Enable or disable Streams on the table.

  • Remove a global secondary index from the table.

  • Create a new global secondary index on the table. Once the index begins backfilling,

    you can use UpdateTable to perform other operations.
# File lib/lambda_wrap/dynamo_db_manager.rb, line 266
def update_table(full_table_name, table_details)
  # Determine if Provisioned Throughput needs to be updated.
  if  @read_capacity_units != table_details.provisioned_throughput.read_capacity_units &&
      @write_capacity_units != table_details.provisioned_throughput.write_capacity_units

    update_provisioned_throughput(
      full_table_name, table_details.provisioned_throughput.read_capacity_units,
      table_details.provisioned_throughput.write_capacity_units
    )

    # Wait up to 30 minutes.
    wait_until_table_is_available(full_table_name, 5, 360)
  end

  # Determine if there are any Global Secondary Indexes to be deleted.
  global_secondary_indexes_to_delete = build_global_index_deletes_array(table_details.global_secondary_indexes)
  unless global_secondary_indexes_to_delete.empty?
    # Loop through each index to delete, and send the update one at a time (restriction on the API).
    until global_secondary_indexes_to_delete.empty?
      delete_global_index(full_table_name, global_secondary_indexes_to_delete.pop)

      # Wait up to 2 hours.
      wait_until_table_is_available(full_table_name, 10, 720)
    end
  end

  # Determine if there are updates to the Provisioned Throughput of the Global Secondary Indexes
  global_secondary_index_updates = build_global_index_updates_array(table_details.global_secondary_indexes)
  unless global_secondary_index_updates.empty?
    update_global_indexes(full_table_name, global_secondary_index_updates)

    # Wait up to 4 hours.
    wait_until_table_is_available(full_table_name, 10, 1_440)
  end

  # Determine if there are new Global Secondary Indexes to be created.
  new_global_secondary_indexes = build_new_global_indexes_array(table_details.global_secondary_indexes)
  return if new_global_secondary_indexes.empty?

  create_global_indexes(full_table_name, new_global_secondary_indexes)

  # Wait up to 4 hours.
  wait_until_table_is_available(full_table_name, 10, 1_440)
end
wait_until_table_is_available(full_table_name, delay = 5, max_attempts = 5) click to toggle source

Waits for the table to be available

# File lib/lambda_wrap/dynamo_db_manager.rb, line 228
def wait_until_table_is_available(full_table_name, delay = 5, max_attempts = 5)
  puts "Waiting for Table #{full_table_name} to be available."
  puts "Waiting with a #{delay} second delay between attempts, for a maximum of #{max_attempts} attempts."
  max_time = Time.at(delay * max_attempts).utc.strftime('%H:%M:%S')
  puts "Max waiting time will be: #{max_time} (approximate)."
  # wait until the table has updated to being fully available
  # waiting for ~2min at most; an error will be thrown afterwards

  started_waiting_at = Time.now
  max_attempts.times do |attempt|
    puts "Attempt #{attempt + 1}/#{max_attempts}, \
    #{Time.at(Time.now - started_waiting_at).utc.strftime('%H:%M:%S')}/#{max_time}"

    details = retrieve_table_details(full_table_name)

    if details.table_status != 'ACTIVE'
      puts "Table: #{full_table_name} is not yet available. Status: #{details.table_status}. Retrying..."
    else
      updating_indexes = details.global_secondary_indexes.reject do |global_index|
        global_index.index_status == 'ACTIVE'
      end
      return true if updating_indexes.empty?
      puts 'Table is available, but the global indexes are not:'
      puts(updating_indexes.map { |global_index| "#{global_index.index_name}, #{global_index.index_status}" })
    end
    Kernel.sleep(delay.seconds)
  end

  raise Exception, "Table #{full_table_name} did not become available after #{max_attempts} attempts. " \
    'Try again later or inspect the AWS console.'
end