class Toy::Dynamo::Table
Constants
- COMPARISON_OPERATOR
- COMPARISON_OPERATOR_SCAN_ONLY
- QUERY_SELECT
- RETURNED_CONSUMED_CAPACITY
- TYPE_INDICATOR
Attributes
client[R]
hash_key[R]
range_keys[R]
schema_loaded_from_dynamo[R]
table_schema[R]
Public Class Methods
new(table_schema, client, options={})
click to toggle source
# File lib/toy/dynamo/table.rb, line 53 def initialize(table_schema, client, options={}) @table_schema = table_schema @client = client self.load_schema unless options[:novalidate] end
Public Instance Methods
attr_with_type(attr_name, value)
click to toggle source
# File lib/toy/dynamo/table.rb, line 111 def attr_with_type(attr_name, value) { attr_name => { TYPE_INDICATOR[type_from_value(value)] => value.to_s } } end
batch_get_item(keys, options={})
click to toggle source
# File lib/toy/dynamo/table.rb, line 229 def batch_get_item(keys, options={}) options[:return_consumed_capacity] ||= :none options[:select] ||= [] options[:consistent_read] = false unless options[:consistent_read] raise ArgumentError, "must include between 1 - 100 keys" if keys.size == 0 || keys.size > 100 keys_request = [] keys.each do |k| key_request = {} if @primary_range_key hash_value = k[:hash_value] else raise ArgumentError, "expected keys to be in the form of ['hash key here'] for table with no range keys" if hash_value.is_a?(Hash) hash_value = k end raise ArgumentError, "every key must include a :hash_value" if hash_value.blank? key_request[@hash_key[:attribute_name]] = { @hash_key[:attribute_type] => hash_value.to_s } if @primary_range_key range_value = k[:range_value] raise ArgumentError, "every key must include a :range_value" if range_value.blank? key_request[@primary_range_key[:attribute_name]] = { @primary_range_key[:attribute_type] => range_value.to_s } end keys_request << key_request end request_items_request = {} request_items_request.merge!( :keys => keys_request ) request_items_request.merge!( :attributes_to_get => [options[:select]].flatten ) unless options[:select].blank? request_items_request.merge!( :consistent_read => options[:consistent_read] ) if options[:consistent_read] batch_get_item_request = { :request_items => { (options[:table_name] || self.table_name) => request_items_request }, :return_consumed_capacity => RETURNED_CONSUMED_CAPACITY[options[:return_consumed_capacity]] } @client.batch_get_item(batch_get_item_request) end
create(options={})
click to toggle source
# File lib/toy/dynamo/table.rb, line 437 def create(options={}) if @client.list_tables[:table_names].include?(options[:table_name] || self.table_name) raise "Table #{options[:table_name] || self.table_name} already exists" end @client.create_table(@table_schema.merge({ :table_name => options[:table_name] || self.table_name })) while (table_metadata = self.describe(options))[:table][:table_status] == "CREATING" sleep 1 end self.load_schema table_metadata end
delete(options={})
click to toggle source
# File lib/toy/dynamo/table.rb, line 458 def delete(options={}) return false unless @client.list_tables[:table_names].include?(options[:table_name] || self.table_name) @client.delete_table(:table_name => options[:table_name] || self.table_name) begin while (table_metadata = self.describe) && table_metadata[:table][:table_status] == "DELETING" sleep 1 end rescue AWS::DynamoDB::Errors::ResourceNotFoundException => e Toy::Dynamo::Config.logger.info "Table deleted" end true end
delete_item(hash_key_value, options={})
click to toggle source
# File lib/toy/dynamo/table.rb, line 323 def delete_item(hash_key_value, options={}) key_request = { @hash_key[:attribute_name] => { @hash_key[:attribute_type] => hash_key_value.to_s } } if @primary_range_key raise ArgumentError, "range_key was not provided to the delete_item command" if options[:range_value].blank? key_request.merge!({ @primary_range_key[:attribute_name] => { @primary_range_key[:attribute_type] => options[:range_value].to_s } }) end delete_item_request = { :table_name => options[:table_name] || self.table_name, :key => key_request } @client.delete_item(delete_item_request) end
describe(options={})
click to toggle source
# File lib/toy/dynamo/table.rb, line 454 def describe(options={}) @client.describe_table(:table_name => options[:table_name] || self.table_name) end
get_item(hash_key, options={})
click to toggle source
# File lib/toy/dynamo/table.rb, line 115 def get_item(hash_key, options={}) options[:consistent_read] = false unless options[:consistent_read] options[:return_consumed_capacity] ||= :none # "NONE" # || "TOTAL" options[:select] ||= [] get_item_request = { :table_name => options[:table_name] || self.table_name, :key => hash_key_item_param(hash_key), :consistent_read => options[:consistent_read], :return_consumed_capacity => RETURNED_CONSUMED_CAPACITY[options[:return_consumed_capacity]] } get_item_request.merge!( :attributes_to_get => [options[:select]].flatten ) unless options[:select].blank? @client.get_item(get_item_request) end
hash_key_condition_param(hash_key, value)
click to toggle source
# File lib/toy/dynamo/table.rb, line 101 def hash_key_condition_param(hash_key, value) hash_key_type = @table_schema[:attribute_definitions].find{|h| h[:attribute_name] == hash_key}[:attribute_type] { hash_key => { :attribute_value_list => [hash_key_type => value], :comparison_operator => COMPARISON_OPERATOR[:eq] } } end
hash_key_item_param(value)
click to toggle source
# File lib/toy/dynamo/table.rb, line 95 def hash_key_item_param(value) hash_key = @table_schema[:key_schema].find{|h| h[:key_type] == "HASH"}[:attribute_name] hash_key_type = @table_schema[:attribute_definitions].find{|h| h[:attribute_name] == hash_key}[:attribute_type] { hash_key => { hash_key_type => value } } end
load_schema()
click to toggle source
# File lib/toy/dynamo/table.rb, line 59 def load_schema @schema_loaded_from_dynamo = self.describe @schema_loaded_from_dynamo[:table][:key_schema].each do |key| key_attr = @table_schema[:attribute_definitions].find{|h| h[:attribute_name] == key[:attribute_name]} next if key_attr.nil? key_schema_attr = { :attribute_name => key[:attribute_name], :attribute_type => key_attr[:attribute_type] } if key[:key_type] == "HASH" @hash_key = key_schema_attr else (@range_keys ||= []) << key_schema_attr.merge(:primary_range_key => true) @primary_range_key = key_schema_attr.merge(:primary_range_key => true) end end if @schema_loaded_from_dynamo[:table][:local_secondary_indexes] || @schema_loaded_from_dynamo[:table][:global_secondary_indexes] ((@schema_loaded_from_dynamo[:table][:local_secondary_indexes] || []) + (@schema_loaded_from_dynamo[:table][:global_secondary_indexes] || [])).each do |key| si_range_key = key[:key_schema].find{|h| h[:key_type] == "RANGE" } next if si_range_key.nil? si_range_attribute = @table_schema[:attribute_definitions].find{|h| h[:attribute_name] == si_range_key[:attribute_name]} next if si_range_attribute.nil? (@range_keys ||= []) << { :attribute_name => si_range_key[:attribute_name], :attribute_type => si_range_attribute[:attribute_type], :index_name => key[:index_name] } end end @schema_loaded_from_dynamo end
query(hash_key_value, options={})
click to toggle source
options¶ ↑
* consistent_read * return_consumed_capacity * order * select * range
# File lib/toy/dynamo/table.rb, line 136 def query(hash_key_value, options={}) options[:consistent_read] = false unless options[:consistent_read] options[:return_consumed_capacity] ||= :none # "NONE" # || "TOTAL" options[:order] ||= :desc #options[:index_name] ||= :none #AWS::DynamoDB::Errors::ValidationException: ALL_PROJECTED_ATTRIBUTES can be used only when Querying using an IndexName #options[:limit] ||= 10 #options[:exclusive_start_key] key_conditions = {} gsi = nil if options[:global_secondary_index] # TODO gsi = @table_schema[:global_secondary_indexes].select{ |gsi| gsi[:index_name].to_s == options[:global_secondary_index].to_s}.first raise ArgumentError, "Could not find Global Secondary Index '#{options[:global_secondary_index]}'" unless gsi gsi_hash_key = gsi[:key_schema].find{|h| h[:key_type] == "HASH"}[:attribute_name] key_conditions.merge!(hash_key_condition_param(gsi_hash_key, hash_key_value)) else hash_key = @table_schema[:key_schema].find{|h| h[:key_type] == "HASH"}[:attribute_name] key_conditions.merge!(hash_key_condition_param(hash_key, hash_key_value)) end query_request = { :table_name => options[:table_name] || self.table_name, :key_conditions => key_conditions, :consistent_read => options[:consistent_read], :return_consumed_capacity => RETURNED_CONSUMED_CAPACITY[options[:return_consumed_capacity]], :scan_index_forward => (options[:order] == :asc) } if options[:range] raise ArgumentError, "Expected a 1 element Hash for :range (ex {:age.gt => 13})" unless options[:range].is_a?(Hash) && options[:range].keys.size == 1 && options[:range].keys.first.is_a?(String) range_key_name, comparison_operator = options[:range].keys.first.split(".") raise ArgumentError, "Comparison operator must be one of (#{(COMPARISON_OPERATOR.keys - COMPARISON_OPERATOR_SCAN_ONLY).join(", ")})" unless COMPARISON_OPERATOR.keys.include?(comparison_operator.to_sym) range_key = nil #[{:attribute_name=>"health_check_guid", :attribute_type=>"S", :primary_range_key=>true}] #raise @range_keys.inspect #if options[:global_secondary_index] #raise @table_schema.inspect #hash_key_type = @table_schema[:attribute_definitions].find{|h| h[:attribute_name] == hash_key}[:attribute_type] #raise gsi[:key_schema].inspect ##range_key = gsi.find{|k| k[:attribute_name] == range_key_name} #else range_key = @range_keys.find{|k| k[:attribute_name] == range_key_name} #end raise ArgumentError, ":range key must be a valid Range attribute" unless range_key raise ArgumentError, ":range key must be a Range if using the operator BETWEEN" if comparison_operator == "between" && !options[:range].values.first.is_a?(Range) if range_key.has_key?(:index_name) # Local/Global Secondary Index query_request.merge!(:index_name => range_key[:index_name]) end range_value = options[:range].values.first range_attribute_list = [] if comparison_operator == "between" range_attribute_list << { range_key[:attribute_type] => range_value.min } range_attribute_list << { range_key[:attribute_type] => range_value.max } else # TODO - support Binary? range_attribute_list = [{ range_key[:attribute_type] => range_value.to_s }] end key_conditions.merge!({ range_key[:attribute_name] => { :attribute_value_list => range_attribute_list, :comparison_operator => COMPARISON_OPERATOR[comparison_operator.to_sym] } }) end if options[:global_secondary_index] # Override index_name if using GSI options[:select] = :projected if options[:select].blank? query_request.merge!(:index_name => gsi[:index_name]) end options[:select] ||= :all # :all, :projected, :count, [] if options[:select].is_a?(Array) attrs_to_select = [options[:select].map(&:to_s)].flatten attrs_to_select << @hash_key[:attribute_name] attrs_to_select << @primary_range_key[:attribute_name] if @primary_range_key query_request.merge!({ :select => QUERY_SELECT[:specific], :attributes_to_get => attrs_to_select.uniq }) else query_request.merge!({ :select => QUERY_SELECT[options[:select]] }) end query_request.merge!({ :limit => options[:limit].to_i }) if options.has_key?(:limit) query_request.merge!({ :exclusive_start_key => options[:exclusive_start_key] }) if options[:exclusive_start_key] @client.query(query_request) end
resize(options={})
click to toggle source
# File lib/toy/dynamo/table.rb, line 471 def resize(options={}) return false unless @client.list_tables[:table_names].include?(options[:table_name] || self.table_name) @client.update_table({ :provisioned_throughput => { :read_capacity_units => (options[:read_capacity_units] || @table_schema[:provisioned_throughput][:read_capacity_units]).to_i, :write_capacity_units => (options[:write_capacity_units] || @table_schema[:provisioned_throughput][:write_capacity_units]).to_i }, :table_name => options[:table_name] || self.table_name }) while (table_metadata = self.describe) && table_metadata[:table][:table_status] == "UPDATING" sleep 1 end Toy::Dynamo::Config.logger.info "Table resized to #{table_metadata[:table][:provisioned_throughput]}" true end
scan(options={})
click to toggle source
Perform a table scan docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Scan.html
# File lib/toy/dynamo/table.rb, line 366 def scan(options={}) options[:return_consumed_capacity] ||= :none # "NONE" # || "TOTAL" # Default if not already set options[:select] ||= :all # :all, :projected, :count, [] scan_request = { :table_name => options[:table_name] || self.table_name, :return_consumed_capacity => RETURNED_CONSUMED_CAPACITY[options[:return_consumed_capacity]] } scan_request.merge!({ :limit => options[:limit].to_i }) if options.has_key?(:limit) scan_request.merge!({ :exclusive_start_key => options[:exclusive_start_key] }) if options[:exclusive_start_key] if options[:select].is_a?(Array) attrs_to_select = [options[:select].map(&:to_s)].flatten attrs_to_select << @hash_key[:attribute_name] attrs_to_select << @primary_range_key[:attribute_name] if @primary_range_key scan_request.merge!({ :select => QUERY_SELECT[:specific], :attributes_to_get => attrs_to_select.uniq }) else scan_request.merge!({ :select => QUERY_SELECT[options[:select]] }) end # :scan_filter => { :name.begins_with => "a" } scan_filter = {} if options[:scan_filter].present? options[:scan_filter].each_pair.each do |k,v| # Hard to validate attribute types here, so infer by type sent and assume the user knows their own attrs key_name, comparison_operator = k.split(".") raise ArgumentError, "Comparison operator must be one of (#{COMPARISON_OPERATOR.keys.join(", ")})" unless COMPARISON_OPERATOR.keys.include?(comparison_operator.to_sym) raise ArgumentError, "scan_filter value must be a Range if using the operator BETWEEN" if comparison_operator == "between" && !v.is_a?(Range) raise ArgumentError, "scan_filter value must be a Array if using the operator IN" if comparison_operator == "in" && !v.is_a?(Array) attribute_value_list = [] if comparison_operator == "in" v.each do |in_v| attribute_value_list << attr_with_type(key_name, in_v).values.last end elsif comparison_operator == "between" attribute_value_list << attr_with_type(key_name, range_value.min).values.last attribute_value_list << attr_with_type(key_name, range_value.max).values.last else attribute_value_list << attr_with_type(key_name, v).values.last end scan_filter.merge!({ key_name => { :comparison_operator => COMPARISON_OPERATOR[comparison_operator.to_sym], :attribute_value_list => attribute_value_list } }) end scan_request.merge!(:scan_filter => scan_filter) end scan_request.merge!({ :segment => options[:segment].to_i }) if options[:segment].present? scan_request.merge!({ :total_segments => options[:total_segments].to_i }) if options[:total_segments].present? @client.scan(scan_request) end
table_name()
click to toggle source
Call proc or return string
# File lib/toy/dynamo/table.rb, line 429 def table_name if @table_schema[:table_name].respond_to?(:call) @table_schema[:table_name].call else @table_schema[:table_name] end end
type_from_value(value)
click to toggle source
# File lib/toy/dynamo/table.rb, line 344 def type_from_value(value) case when value.kind_of?(AWS::DynamoDB::Binary) then :b when value.respond_to?(:to_str) then :s when value.kind_of?(Numeric) then :n when value.respond_to?(:each) indicator = nil value.each do |v| member_indicator = type_indicator(v) raise ArgumentError, "nested collections" if member_indicator.to_s.size > 1 raise ArgumentError, "mixed types" if indicator and member_indicator != indicator indicator = member_indicator end indicator ||= :s :"#{indicator}s" else raise ArgumentError, "unsupported attribute type #{value.class}" end end
write(hash_key_value, attributes, options={})
click to toggle source
# File lib/toy/dynamo/table.rb, line 265 def write(hash_key_value, attributes, options={}) options[:return_consumed_capacity] ||= :none options[:update_item] = false unless options[:update_item] if options[:update_item] # UpdateItem key_request = { @hash_key[:attribute_name] => { @hash_key[:attribute_type] => hash_key_value.to_s } } if @primary_range_key range_key_value = attributes[@primary_range_key[:attribute_name]] raise ArgumentError, "range_key was not provided to the write command" if range_key_value.blank? key_request.merge!({ @primary_range_key[:attribute_name] => { @primary_range_key[:attribute_type] => range_key_value.to_s } }) end attrs_to_update = {} attributes.each_pair do |k,v| next if @primary_range_key && k == @primary_range_key[:attribute_name] if v.blank? attrs_to_update.merge!({ k => { :action => "DELETE" } }) else attrs_to_update.merge!({ k => { :value => attr_with_type(k,v).values.last, :action => "PUT" } }) end end update_item_request = { :table_name => options[:table_name] || self.table_name, :key => key_request, :attribute_updates => attrs_to_update, :return_consumed_capacity => RETURNED_CONSUMED_CAPACITY[options[:return_consumed_capacity]] } @client.update_item(update_item_request) else # PutItem items = {} attributes.each_pair do |k,v| next if v.blank? # If empty string or nil, skip... items.merge!(attr_with_type(k,v)) end items.merge!(hash_key_item_param(hash_key_value)) put_item_request = { :table_name => options[:table_name] || self.table_name, :item => items, :return_consumed_capacity => RETURNED_CONSUMED_CAPACITY[options[:return_consumed_capacity]] } @client.put_item(put_item_request) end end