class LaunchDarkly::Impl::Integrations::DynamoDB::DynamoDBFeatureStoreCore

Internal implementation of the DynamoDB feature store, intended to be used with CachingStoreWrapper.

Constants

AWS_SDK_ENABLED
ITEM_JSON_ATTRIBUTE
PARTITION_KEY
SORT_KEY
VERSION_ATTRIBUTE

Public Class Methods

new(table_name, opts) click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 29
def initialize(table_name, opts)
  if !AWS_SDK_ENABLED
    raise RuntimeError.new("can't use DynamoDB feature store without the aws-sdk or aws-sdk-dynamodb gem")
  end

  @table_name = table_name
  @prefix = opts[:prefix]
  @logger = opts[:logger] || Config.default_logger

  if !opts[:existing_client].nil?
    @client = opts[:existing_client]
  else
    @client = Aws::DynamoDB::Client.new(opts[:dynamodb_opts] || {})
  end

  @logger.info("DynamoDBFeatureStore: using DynamoDB table \"#{table_name}\"")
end

Public Instance Methods

get_all_internal(kind) click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 83
def get_all_internal(kind)
  items_out = {}
  req = make_query_for_kind(kind)
  while true
    resp = @client.query(req)
    resp.items.each do |item|
      item_out = unmarshal_item(kind, item)
      items_out[item_out[:key].to_sym] = item_out
    end
    break if resp.last_evaluated_key.nil? || resp.last_evaluated_key.length == 0
    req.exclusive_start_key = resp.last_evaluated_key
  end
  items_out
end
get_internal(kind, key) click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 78
def get_internal(kind, key)
  resp = get_item_by_keys(namespace_for_kind(kind), key)
  unmarshal_item(kind, resp.item)
end
init_internal(all_data) click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 47
def init_internal(all_data)
  # Start by reading the existing keys; we will later delete any of these that weren't in all_data.
  unused_old_keys = read_existing_keys(all_data.keys)

  requests = []
  num_items = 0

  # Insert or update every provided item
  all_data.each do |kind, items|
    items.values.each do |item|
      requests.push({ put_request: { item: marshal_item(kind, item) } })
      unused_old_keys.delete([ namespace_for_kind(kind), item[:key] ])
      num_items = num_items + 1
    end
  end

  # Now delete any previously existing items whose keys were not in the current data
  unused_old_keys.each do |tuple|
    del_item = make_keys_hash(tuple[0], tuple[1])
    requests.push({ delete_request: { key: del_item } })
  end
    
  # Now set the special key that we check in initialized_internal?
  inited_item = make_keys_hash(inited_key, inited_key)
  requests.push({ put_request: { item: inited_item } })

  DynamoDBUtil.batch_write_requests(@client, @table_name, requests)

  @logger.info { "Initialized table #{@table_name} with #{num_items} items" }
end
initialized_internal?() click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 122
def initialized_internal?
  resp = get_item_by_keys(inited_key, inited_key)
  !resp.item.nil? && resp.item.length > 0
end
stop() click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 127
def stop
  # AWS client doesn't seem to have a close method
end
upsert_internal(kind, new_item) click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 98
def upsert_internal(kind, new_item)
  encoded_item = marshal_item(kind, new_item)
  begin
    @client.put_item({
      table_name: @table_name,
      item: encoded_item,
      condition_expression: "attribute_not_exists(#namespace) or attribute_not_exists(#key) or :version > #version",
      expression_attribute_names: {
        "#namespace" => PARTITION_KEY,
        "#key" => SORT_KEY,
        "#version" => VERSION_ATTRIBUTE
      },
      expression_attribute_values: {
        ":version" => new_item[:version]
      }
    })
    new_item
  rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException
    # The item was not updated because there's a newer item in the database.
    # We must now read the item that's in the database and return it, so CachingStoreWrapper can cache it.
    get_internal(kind, new_item[:key])
  end
end

Private Instance Methods

get_item_by_keys(namespace, key) click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 165
def get_item_by_keys(namespace, key)
  @client.get_item({
    table_name: @table_name,
    key: make_keys_hash(namespace, key)
  })
end
inited_key() click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 141
def inited_key
  prefixed_namespace("$inited")
end
make_keys_hash(namespace, key) click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 145
def make_keys_hash(namespace, key)
  {
    PARTITION_KEY => namespace,
    SORT_KEY => key
  }
end
make_query_for_kind(kind) click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 152
def make_query_for_kind(kind)
  {
    table_name: @table_name,
    consistent_read: true,
    key_conditions: {
      PARTITION_KEY => {
        comparison_operator: "EQ",
        attribute_value_list: [ namespace_for_kind(kind) ]
      }
    }
  }
end
marshal_item(kind, item) click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 196
def marshal_item(kind, item)
  make_keys_hash(namespace_for_kind(kind), item[:key]).merge({
    VERSION_ATTRIBUTE => item[:version],
    ITEM_JSON_ATTRIBUTE => Model.serialize(kind, item)
  })
end
namespace_for_kind(kind) click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 137
def namespace_for_kind(kind)
  prefixed_namespace(kind[:namespace])
end
prefixed_namespace(base_str) click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 133
def prefixed_namespace(base_str)
  (@prefix.nil? || @prefix == "") ? base_str : "#{@prefix}:#{base_str}"
end
read_existing_keys(kinds) click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 172
def read_existing_keys(kinds)
  keys = Set.new
  kinds.each do |kind|
    req = make_query_for_kind(kind).merge({
      projection_expression: "#namespace, #key",
      expression_attribute_names: {
        "#namespace" => PARTITION_KEY,
        "#key" => SORT_KEY
      }
    })
    while true
      resp = @client.query(req)
      resp.items.each do |item|
        namespace = item[PARTITION_KEY]
        key = item[SORT_KEY]
        keys.add([ namespace, key ])
      end
      break if resp.last_evaluated_key.nil? || resp.last_evaluated_key.length == 0
      req.exclusive_start_key = resp.last_evaluated_key
    end
  end
  keys
end
unmarshal_item(kind, item) click to toggle source
# File lib/ldclient-rb/impl/integrations/dynamodb_impl.rb, line 203
def unmarshal_item(kind, item)
  return nil if item.nil? || item.length == 0
  json_attr = item[ITEM_JSON_ATTRIBUTE]
  raise RuntimeError.new("DynamoDB map did not contain expected item string") if json_attr.nil?
  Model.deserialize(kind, json_attr)
end