class Cassandra
Create a new Cassandra
client instance. Accepts a keyspace name, and optional host and port.
client = Cassandra.new('twitter', '127.0.0.1:9160')
If the server requires authentication, you must authenticate before make calls
client.login!('username','password')
You can then make calls to the server via the client
instance.
client.insert(:UserRelationships, "5", {"user_timeline" => {SimpleUUID::UUID.new => "1"}}) client.get(:UserRelationships, "5", "user_timeline")
For read methods, valid option parameters are:
:count
-
How many results to return. Defaults to 100.
:start
-
Column name token at which to start iterating, inclusive. Defaults to nil, which means the first column in the collation order.
:finish
-
Column name token at which to stop iterating, inclusive. Defaults to nil, which means no boundary.
:reversed
-
Swap the direction of the collation order.
:consistency
-
The consistency level of the request. Defaults to
Cassandra::Consistency::ONE
(one node must respond). Other valid options areCassandra::Consistency::ZERO
,Cassandra::Consistency::QUORUM
, andCassandra::Consistency::ALL
.
Note that some read options have no relevance in some contexts.
For write methods, valid option parameters are:
:timestamp
-
The transaction timestamp. Defaults to the current time in milliseconds. This is used for conflict resolution by the server; you normally never need to change it.
:consistency
-
See above.
For the initial client instantiation, you may also pass in <tt>:thrift_client<tt> with a ThriftClient subclass attached. On connection, that class will be used instead of the default ThriftClient class, allowing you to add additional behavior to the connection (e.g. query logging).
OrderedHash
is namespaced to prevent conflicts with other implementations
Constants
- READ_DEFAULTS
- THRIFT_DEFAULTS
- WRITE_DEFAULTS
Attributes
Public Class Methods
# File lib/cassandra/0.6/cassandra.rb 2 def self.DEFAULT_TRANSPORT_WRAPPER 3 Thrift::BufferedTransport 4 end
# File lib/cassandra/0.6.rb 2 def self.VERSION 3 "0.6" 4 end
Create a new Cassandra
instance and open the connection.
# File lib/cassandra/cassandra.rb 75 def initialize(keyspace, servers = "127.0.0.1:9160", thrift_client_options = {}) 76 @is_super = {} 77 @column_name_class = {} 78 @sub_column_name_class = {} 79 @column_name_maker = {} 80 @sub_column_name_maker = {} 81 @auto_discover_nodes = true 82 thrift_client_options[:transport_wrapper] ||= Cassandra.DEFAULT_TRANSPORT_WRAPPER 83 @thrift_client_options = THRIFT_DEFAULTS.merge(thrift_client_options) 84 @thrift_client_class = @thrift_client_options[:thrift_client_class] 85 @keyspace = keyspace 86 @servers = Array(servers) 87 end
Public Instance Methods
Add a value to the counter in cf:key:super column:column
# File lib/cassandra/0.8/cassandra.rb 6 def add(column_family, key, value, *columns_and_options) 7 column_family, column, sub_column, options = extract_and_validate_params(column_family, key, columns_and_options, WRITE_DEFAULTS) 8 9 mutation_map = if is_super(column_family) 10 { 11 key => { 12 column_family => [_super_counter_mutation(column_family, column, sub_column, value)] 13 } 14 } 15 else 16 { 17 key => { 18 column_family => [_standard_counter_mutation(column_family, column, value)] 19 } 20 } 21 end 22 23 @batch ? @batch << [mutation_map, options[:consistency]] : _mutate(mutation_map, options[:consistency]) 24 end
Creates a new column family from the passed in Cassandra::ColumnFamily
instance, and returns the schema id.
# File lib/cassandra/cassandra.rb 269 def add_column_family(cf_def) 270 return false if Cassandra.VERSION.to_f < 0.7 271 272 begin 273 res = client.system_add_column_family(cf_def) 274 rescue CassandraThrift::TimedOutException => te 275 puts "Timed out: #{te.inspect}" 276 end 277 @schema = nil 278 res 279 end
Add keyspace using the passed in keyspace definition.
Returns the new schema id.
# File lib/cassandra/cassandra.rb 336 def add_keyspace(ks_def) 337 return false if Cassandra.VERSION.to_f < 0.7 338 339 begin 340 res = client.system_add_keyspace(ks_def) 341 rescue CassandraThrift::TimedOutException => toe 342 puts "Timed out: #{toe.inspect}" 343 rescue Thrift::TransportException => te 344 puts "Timed out: #{te.inspect}" 345 end 346 @keyspaces = nil 347 res 348 end
Open a batch operation and yield self. Inserts and deletes will be queued until the block closes, and then sent atomically to the server. Supports the :consistency
option, which overrides the consistency set in the individual commands.
# File lib/cassandra/0.6/cassandra.rb 58 def batch(options = {}) 59 _, _, _, options = 60 extract_and_validate_params(schema.keys.first, "", [options], WRITE_DEFAULTS) 61 62 @batch = [] 63 yield(self) 64 compacted_map,seen_clevels = compact_mutations! 65 clevel = if options[:consistency] != nil # Override any clevel from individual mutations if 66 options[:consistency] 67 elsif seen_clevels.length > 1 # Cannot choose which CLevel to use if there are several ones 68 raise "Multiple consistency levels used in the batch, and no override...cannot pick one" 69 else # if no consistency override has been provided but all the clevels in the batch are the same: use that one 70 seen_clevels.first 71 end 72 73 _mutate(compacted_map,clevel) 74 ensure 75 @batch = nil 76 end
Remove all rows in the column family you request.
-
column_family
-
options
-
consitency
-
timestamp
-
# File lib/cassandra/0.6/cassandra.rb 39 def clear_column_family!(column_family, options = {}) 40 each_key(column_family) do |key| 41 remove(column_family, key, options) 42 end 43 end
Remove all rows in the keyspace. Supports options :consistency
and :timestamp
. FIXME May not currently delete all records without multiple calls. Waiting for ranged remove support in Cassandra
.
# File lib/cassandra/0.6/cassandra.rb 50 def clear_keyspace!(options = {}) 51 schema.keys.each { |column_family| clear_column_family!(column_family, options) } 52 end
Returns the string name specified for the cluster.
Please note that this only works on version 0.7.0 and higher.
# File lib/cassandra/cassandra.rb 208 def cluster_name 209 return false if Cassandra.VERSION.to_f < 0.7 210 211 @cluster_name ||= client.describe_cluster_name() 212 end
Return a hash of column_family definitions indexed by their names
# File lib/cassandra/cassandra.rb 163 def column_families 164 return false if Cassandra.VERSION.to_f < 0.7 165 166 schema.cf_defs.inject(Hash.new){|memo, cf_def| memo[cf_def.name] = cf_def; memo;} 167 end
Count the columns for the provided parameters.
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
columns - Either a single super_column or a list of columns.
-
sub_columns - The list of sub_columns to select.
-
options - Valid options are:
-
:start - The column name to start from.
-
:stop - The column name to stop at.
-
:count - The maximum count of columns to return. (By default cassandra will count up to 100 columns)
-
:consistency - Uses the default read consistency if none specified.
-
# File lib/cassandra/cassandra.rb 523 def count_columns(column_family, key, *columns_and_options) 524 column_family, super_column, _, options = 525 extract_and_validate_params(column_family, key, columns_and_options, READ_DEFAULTS) 526 _count_columns(column_family, key, super_column, options[:start], options[:stop], options[:count], options[:consistency]) 527 end
Count all rows in the column_family you request.
This method just calls Cassandra#get_range_keys
and returns the number of records returned.
See Cassandra#get_range
for options.
# File lib/cassandra/cassandra.rb 800 def count_range(column_family, options = {}) 801 get_range_keys(column_family, options).length 802 end
Create secondary index.
-
keyspace
-
column_family
-
column_name
-
validation_class
# File lib/cassandra/cassandra.rb 889 def create_index(keyspace, column_family, column_name, validation_class) 890 return false if Cassandra.VERSION.to_f < 0.7 891 892 cf_def = client.describe_keyspace(keyspace).cf_defs.find{|x| x.name == column_family} 893 if !cf_def.nil? and !cf_def.column_metadata.find{|x| x.name == column_name} 894 c_def = CassandraThrift::ColumnDef.new do |cd| 895 cd.name = column_name 896 cd.validation_class = "org.apache.cassandra.db.marshal."+validation_class 897 cd.index_type = CassandraThrift::IndexType::KEYS 898 end 899 cf_def.column_metadata.push(c_def) 900 update_column_family(cf_def) 901 end 902 end
This method takes an array if CassandraThrift::IndexExpression objects and creates a CassandraThrift::IndexClause for use in the Cassandra#get_index_slices
-
index_expressions -
Array
of CassandraThrift::IndexExpressions. -
start - The starting row key.
-
count - The count of items to be returned
# File lib/cassandra/cassandra.rb 959 def create_index_clause(index_expressions, start = "", count = 100) 960 return false if Cassandra.VERSION.to_f < 0.7 961 962 CassandraThrift::IndexClause.new( 963 :start_key => start, 964 :expressions => index_expressions, 965 :count => count) 966 end
This method is mostly used internally by get_index_slices to create a CassandraThrift::IndexExpression for the given options.
-
column_name - Column to be compared
-
value - Value to compare against
-
comparison - Type of comparison to do.
# File lib/cassandra/cassandra.rb 929 def create_index_expression(column_name, value, comparison) 930 return false if Cassandra.VERSION.to_f < 0.7 931 932 CassandraThrift::IndexExpression.new( 933 :column_name => column_name, 934 :value => value, 935 :op => (case comparison 936 when nil, "EQ", "eq", "==" 937 CassandraThrift::IndexOperator::EQ 938 when "GTE", "gte", ">=" 939 CassandraThrift::IndexOperator::GTE 940 when "GT", "gt", ">" 941 CassandraThrift::IndexOperator::GT 942 when "LTE", "lte", "<=" 943 CassandraThrift::IndexOperator::LTE 944 when "LT", "lt", "<" 945 CassandraThrift::IndexOperator::LT 946 end )) 947 end
The initial default consistency is set to ONE, but you can use this method to override the normal default with your specified value. Use this if you do not want to specify a read consistency for each query.
# File lib/cassandra/cassandra.rb 422 def default_read_consistency=(value) 423 READ_DEFAULTS[:consistency] = value 424 end
The initial default consistency is set to ONE, but you can use this method to override the normal default with your specified value. Use this if you do not want to specify a write consistency for each insert statement.
# File lib/cassandra/cassandra.rb 413 def default_write_consistency=(value) 414 WRITE_DEFAULTS[:consistency] = value 415 end
This is primarily helpful when the cassandra cluster is communicating internally on a different ip address than what you are using to connect. A prime example of this would be when using EC2 to host a cluster. Typically, the cluster would be communicating over the local ip addresses issued by Amazon, but any clients connecting from outside EC2 would need to use the public ip.
# File lib/cassandra/cassandra.rb 101 def disable_node_auto_discovery! 102 @auto_discover_nodes = false 103 end
Disconnect the current client connection.
# File lib/cassandra/cassandra.rb 108 def disconnect! 109 if @client 110 @client.disconnect! 111 @client = nil 112 end 113 end
Delete the specified column family. Return the new schema id.
-
column_family - The column_family name to drop.
# File lib/cassandra/cassandra.rb 286 def drop_column_family(column_family) 287 return false if Cassandra.VERSION.to_f < 0.7 288 289 begin 290 res = client.system_drop_column_family(column_family) 291 rescue CassandraThrift::TimedOutException => te 292 puts "Timed out: #{te.inspect}" 293 end 294 @schema = nil 295 res 296 end
Delete secondary index.
-
keyspace
-
column_family
-
column_name
# File lib/cassandra/cassandra.rb 911 def drop_index(keyspace, column_family, column_name) 912 return false if Cassandra.VERSION.to_f < 0.7 913 914 cf_def = client.describe_keyspace(keyspace).cf_defs.find{|x| x.name == column_family} 915 if !cf_def.nil? and cf_def.column_metadata.find{|x| x.name == column_name} 916 cf_def.column_metadata.delete_if{|x| x.name == column_name} 917 update_column_family(cf_def) 918 end 919 end
Deletes keyspace using the passed in keyspace name.
Returns the new schema id.
# File lib/cassandra/cassandra.rb 355 def drop_keyspace(keyspace=@keyspace) 356 return false if Cassandra.VERSION.to_f < 0.7 357 358 begin 359 res = client.system_drop_keyspace(keyspace) 360 rescue CassandraThrift::TimedOutException => toe 361 puts "Timed out: #{toe.inspect}" 362 rescue Thrift::TransportException => te 363 puts "Timed out: #{te.inspect}" 364 end 365 keyspace = "system" if keyspace.eql?(@keyspace) 366 @keyspaces = nil 367 res 368 end
Iterate through each row in the given column family
This method just calls Cassandra#get_range
and yields the key and columns.
See Cassandra#get_range
for options.
# File lib/cassandra/cassandra.rb 838 def each(column_family, options = {}) 839 get_range_batch(column_family, options) do |key, columns| 840 yield key, columns 841 end 842 end
Iterate through each key within the given parameters. This function can be used to iterate over each key in the given column family.
This method just calls Cassandra#get_range
and yields each row key.
See Cassandra#get_range
for options.
# File lib/cassandra/cassandra.rb 824 def each_key(column_family, options = {}) 825 get_range_batch(column_family, options) do |key, columns| 826 yield key 827 end 828 end
Return true if the column_family:key::[sub_column] path you request exists.
If passed in only a row key it will query for any columns (limiting to 1) for that row key. If a column is passed in it will query for that specific column/super column.
This method will return true or false.
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
columns - Either a single super_column or a list of columns.
-
sub_columns - The list of sub_columns to select.
-
options - Valid options are:
-
:consistency - Uses the default read consistency if none specified.
-
# File lib/cassandra/cassandra.rb 652 def exists?(column_family, key, *columns_and_options) 653 column_family, column, sub_column, options = 654 extract_and_validate_params(column_family, key, columns_and_options, READ_DEFAULTS) 655 result = if column 656 _multiget(column_family, [key], column, sub_column, 1, '', '', false, options[:consistency])[key] 657 else 658 _multiget(column_family, [key], nil, nil, 1, '', '', false, options[:consistency])[key] 659 end 660 661 ![{}, nil].include?(result) 662 end
Send the batch queue to the server
# File lib/cassandra/cassandra.rb 867 def flush_batch(options={}) 868 compacted_map,seen_clevels = compact_mutations! 869 870 clevel = if options[:consistency] != nil # Override any clevel from individual mutations if 871 options[:consistency] 872 elsif seen_clevels.length > 1 # Cannot choose which CLevel to use if there are several ones 873 raise "Multiple consistency levels used in the batch, and no override...cannot pick one" 874 else # if no consistency override has been provided but all the clevels in the batch are the same: use that one 875 seen_clevels.first 876 end 877 878 _mutate(compacted_map,clevel) 879 end
Return a hash (actually, a Cassandra::OrderedHash
) or a single value representing the element at the column_family:key::[sub_column] path you request.
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
column - Either a single super_column or single column.
-
sub_column - A single sub_column to select.
-
options - Valid options are:
-
:count - The number of columns requested to be returned.
-
:start - The starting value for selecting a range of columns.
-
:finish - The final value for selecting a range of columns.
-
:reversed - If set to true the results will be returned in
reverse order.
-
:consistency - Uses the default read consistency if none specified.
-
# File lib/cassandra/cassandra.rb 599 def get(column_family, key, *columns_and_options) 600 multi_get(column_family, [key], *columns_and_options)[key] 601 end
Return a hash of column value pairs for the path you request.
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
columns - Either a single super_column or a list of columns.
-
sub_columns - The list of sub_columns to select.
-
options - Valid options are:
-
:consistency - Uses the default read consistency if none specified.
-
# File lib/cassandra/cassandra.rb 557 def get_columns(column_family, key, *columns_and_options) 558 column_family, columns, sub_columns, options = 559 extract_and_validate_params(column_family, key, columns_and_options, READ_DEFAULTS) 560 _get_columns(column_family, key, columns, sub_columns, options[:consistency]) 561 end
This method is used to query a secondary index with a set of provided search parameters.
Please note that you can either specify a CassandraThrift::IndexClause or an array of hashes with the format as below.
-
column_family - The Column Family this operation will be run on.
-
index_clause - This can either be a CassandraThrift::IndexClause or an array of hashes with the following keys:
-
:column_name - Column to be compared
-
:value - Value to compare against
-
:comparison - Type of comparison to do.
-
-
options
-
:key_count - Set maximum number of rows to return. (Only works if CassandraThrift::IndexClause is not passed in.)
-
:start_key - Set starting row key for search. (Only works if CassandraThrift::IndexClause is not passed in.)
-
:consistency
-
TODO: Supercolumn support.
# File lib/cassandra/cassandra.rb 988 def get_indexed_slices(column_family, index_clause, *columns_and_options) 989 return false if Cassandra.VERSION.to_f < 0.7 990 991 column_family, columns, _, options = 992 extract_and_validate_params(column_family, [], columns_and_options, 993 READ_DEFAULTS.merge(:key_count => 100, :start_key => nil, :key_start => nil)) 994 995 start_key = options[:start_key] || options[:key_start] || "" 996 997 if index_clause.class != CassandraThrift::IndexClause 998 index_expressions = index_clause.collect do |expression| 999 create_index_expression(expression[:column_name], expression[:value], expression[:comparison]) 1000 end 1001 1002 index_clause = create_index_clause(index_expressions, start_key, options[:key_count]) 1003 end 1004 1005 key_slices = _get_indexed_slices(column_family, index_clause, columns, options[:count], options[:start], 1006 options[:finish], options[:reversed], options[:consistency]) 1007 1008 key_slices.inject(OrderedHash.new) {|h, key_slice| h[key_slice.key] = key_slice.columns; h } 1009 end
Return an Cassandra::OrderedHash
containing the columns specified for the given range of keys in the column_family you request.
This method is just a convenience wrapper around Cassandra#get_range_single
and Cassandra#get_range_batch
. If :key_size, :batch_size, or a block is passed in Cassandra#get_range_batch
will be called. Otherwise Cassandra#get_range_single
will be used.
The start_key and finish_key parameters are only useful for iterating of all records as is done in the Cassandra#each
and Cassandra#each_key
methods if you are using the RandomPartitioner.
If the table is partitioned with OrderPreservingPartitioner you may use the start_key and finish_key params to select all records with the same prefix value.
If a block is passed in we will yield the row key and columns for each record returned.
Please note that Cassandra
returns a row for each row that has existed in the system since gc_grace_seconds. This is because deleted row keys are marked as deleted, but left in the system until the cluster has had resonable time to replicate the deletion. This function attempts to suppress deleted rows (actually any row returned without columns is suppressed).
Please note that when enabling the :reversed option, :start and :finish should be swapped (e.g. reversal happens before selecting the range).
-
column_family - The column_family that you are inserting into.
-
options - Valid options are:
-
:start_key - The starting value for selecting a range of keys (only useful with OPP).
-
:finish_key - The final value for selecting a range of keys (only useful with OPP).
-
:key_count - The total number of keys to return from the query. (see note regarding deleted records)
-
:batch_size - The maximum number of keys to return per query. If specified will loop until :key_count is obtained or all records have been returned.
-
:columns - A list of columns to return.
-
:count - The number of columns requested to be returned.
-
:start - The starting value for selecting a range of columns.
-
:finish - The final value for selecting a range of columns.
-
:reversed - If set to true the results will be returned in reverse order.
-
:consistency - Uses the default read consistency if none specified.
-
# File lib/cassandra/cassandra.rb 706 def get_range(column_family, options = {}, &blk) 707 if block_given? || options[:key_count] || options[:batch_size] 708 get_range_batch(column_family, options, &blk) 709 else 710 get_range_single(column_family, options, &blk) 711 end 712 end
Return an Cassandra::OrderedHash
containing the columns specified for the given range of keys in the column_family you request.
If a block is passed in we will yield the row key and columns for each record returned and return a nil value instead of a Cassandra::OrderedHash
.
See Cassandra#get_range
for more details.
# File lib/cassandra/cassandra.rb 756 def get_range_batch(column_family, options = {}) 757 batch_size = options.delete(:batch_size) || 100 758 count = options.delete(:key_count) 759 result = (!block_given? && {}) || nil 760 num_results = 0 761 762 options[:start_key] ||= '' 763 last_key = nil 764 765 while count.nil? || count > num_results 766 res = get_range_single(column_family, options.merge!(:start_key => last_key || options[:start_key], 767 :key_count => batch_size, 768 :return_empty_rows => true 769 )) 770 break if res.keys.last == last_key 771 772 res.each do |key, columns| 773 next if last_key == key 774 next if num_results == count 775 776 unless columns == {} 777 if block_given? 778 yield key, columns 779 else 780 result[key] = columns 781 end 782 num_results += 1 783 end 784 785 last_key = key 786 end 787 end 788 789 result 790 end
Return an Array
containing all of the keys within a given range.
This method just calls Cassandra#get_range
and returns the row keys for the records returned.
See Cassandra#get_range
for options.
# File lib/cassandra/cassandra.rb 812 def get_range_keys(column_family, options = {}) 813 get_range(column_family,options.merge!(:count => 1)).keys 814 end
Return an Cassandra::OrderedHash
containing the columns specified for the given range of keys in the column_family you request.
See Cassandra#get_range
for more details.
# File lib/cassandra/cassandra.rb 720 def get_range_single(column_family, options = {}) 721 return_empty_rows = options.delete(:return_empty_rows) || false 722 723 column_family, _, _, options = 724 extract_and_validate_params(column_family, "", [options], 725 READ_DEFAULTS.merge(:start_key => '', 726 :finish_key => '', 727 :key_count => 100, 728 :columns => nil, 729 :reversed => false 730 ) 731 ) 732 733 results = _get_range( column_family, 734 options[:start_key].to_s, 735 options[:finish_key].to_s, 736 options[:key_count], 737 options[:columns], 738 options[:start].to_s, 739 options[:finish].to_s, 740 options[:count], 741 options[:consistency], 742 options[:reversed] ) 743 744 multi_key_slices_to_hash(column_family, results, return_empty_rows) 745 end
This is the main method used to insert rows into cassandra. If the column_family that you are inserting into is a SuperColumnFamily then the hash passed in should be a nested hash, otherwise it should be a flat hash.
This method can also be called while in batch mode. If in batch mode then we queue up the mutations (an insert in this case) and pass them to cassandra in a single batch at the end of the block.
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
hash - The columns or super columns to insert.
-
options - Valid options are:
-
:timestamp - Uses the current time if none specified.
-
:consistency - Uses the default write consistency if none specified.
-
:ttl - If specified this is the number of seconds after the insert that this value will be available.
-
# File lib/cassandra/cassandra.rb 444 def insert(column_family, key, hash, options = {}) 445 column_family, _, _, options = extract_and_validate_params(column_family, key, [options], WRITE_DEFAULTS) 446 447 timestamp = options[:timestamp] || Time.stamp 448 mutation_map = if is_super(column_family) 449 { 450 key => { 451 column_family => hash.collect{|k,v| _super_insert_mutation(column_family, k, v, timestamp, options[:ttl]) } 452 } 453 } 454 else 455 { 456 key => { 457 column_family => hash.collect{|k,v| _standard_insert_mutation(column_family, k, v, timestamp, options[:ttl])} 458 } 459 } 460 end 461 462 @batch ? @batch << [mutation_map, options[:consistency]] : _mutate(mutation_map, options[:consistency]) 463 end
# File lib/cassandra/0.6/cassandra.rb 18 def inspect 19 "#<Cassandra:#{object_id}, @keyspace=#{keyspace.inspect}, @schema={#{ 20 schema(false).map {|name, hash| ":#{name} => #{hash['type'].inspect}"}.join(', ') 21 }}, @servers=#{servers.inspect}>" 22 end
Set the keyspace to use.
Please note that this only works on version 0.7.0 and higher.
# File lib/cassandra/cassandra.rb 143 def keyspace=(ks) 144 return false if Cassandra.VERSION.to_f < 0.7 145 146 client.set_keyspace(ks) 147 @schema = nil; @keyspace = ks 148 end
Returns an array of available keyspaces.
# File lib/cassandra/0.6/cassandra.rb 27 def keyspaces 28 @keyspaces ||= client.describe_keyspaces() 29 end
Issues a login attempt using the username and password specified.
-
username
-
password
# File lib/cassandra/0.6/cassandra.rb 12 def login!(username, password) 13 @auth_request = CassandraThrift::AuthenticationRequest.new 14 @auth_request.credentials = {'username' => username, 'password' => password} 15 client.login(@keyspace, @auth_request) 16 end
Multi-key version of Cassandra#count_columns
. Please note that this queries the server for each key passed in.
Supports same parameters as Cassandra#count_columns
.
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
columns - Either a single super_column or a list of columns.
-
sub_columns - The list of sub_columns to select.
-
options - Valid options are:
-
:consistency - Uses the default read consistency if none specified.
-
FIXME: Not real multi; needs server support
# File lib/cassandra/cassandra.rb 543 def multi_count_columns(column_family, keys, *options) 544 OrderedHash[*keys.map { |key| [key, count_columns(column_family, key, *options)] }._flatten_once] 545 end
Multi-key version of Cassandra#get
.
This method allows you to select multiple rows with a single query. If a key that is passed in doesn't exist an empty hash will be returned.
Supports the same parameters as Cassandra#get
.
-
column_family - The column_family that you are inserting into.
-
keys - An array of keys to select.
-
column - Either a single super_column or a single column.
-
sub_column - A single ub_columns to select.
-
options - Valid options are:
-
:count - The number of columns requested to be returned.
-
:start - The starting value for selecting a range of columns.
-
:finish - The final value for selecting a range of columns.
-
:reversed - If set to true the results will be returned in reverse order.
-
:consistency - Uses the default read consistency if none specified.
-
# File lib/cassandra/cassandra.rb 623 def multi_get(column_family, keys, *columns_and_options) 624 column_family, column, sub_column, options = 625 extract_and_validate_params(column_family, keys, columns_and_options, READ_DEFAULTS) 626 627 hash = _multiget(column_family, keys, column, sub_column, options[:count], options[:start], options[:finish], options[:reversed], options[:consistency]) 628 629 # Restore order 630 ordered_hash = OrderedHash.new 631 keys.each { |key| ordered_hash[key] = hash[key] || (OrderedHash.new if is_super(column_family) and !sub_column) } 632 ordered_hash 633 end
Multi-key version of Cassandra#get_columns
. Please note that this queries the server for each key passed in.
Supports same parameters as Cassandra#get_columns
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
columns - Either a single super_column or a list of columns.
-
sub_columns - The list of sub_columns to select.
-
options - Valid options are:
-
:consistency - Uses the default read consistency if none specified.
-
# File lib/cassandra/cassandra.rb 576 def multi_get_columns(column_family, keys, *columns_and_options) 577 column_family, columns, sub_columns, options = 578 extract_and_validate_params(column_family, keys, columns_and_options, READ_DEFAULTS) 579 _multi_get_columns(column_family, keys, columns, sub_columns, options[:consistency]) 580 end
Returns a string identifying which partitioner is in use by the current cluster. Typically, this will be RandomPartitioner, but it could be OrderPreservingPartioner as well.
Please note that this only works on version 0.7.0 and higher.
# File lib/cassandra/cassandra.rb 232 def partitioner 233 return false if Cassandra.VERSION.to_f < 0.7 234 235 client.describe_partitioner() 236 end
This method is used to delete (actually marking them as deleted with a tombstone) rows, columns, or super columns depending on the parameters passed. If only a key is passed the entire row will be marked as deleted. If a column name is passed in that column will be deleted.
This method can also be used in batch mode. If in batch mode then we queue up the mutations (a deletion in this case)
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
columns - Either a single super_column or a list of columns.
-
sub_columns - The list of sub_columns to select.
-
options - Valid options are:
-
:timestamp - Uses the current time if none specified.
-
:consistency - Uses the default write consistency if none specified.
-
# File lib/cassandra/cassandra.rb 483 def remove(column_family, key, *columns_and_options) 484 column_family, columns, sub_column, options = extract_and_validate_params(column_family, key, columns_and_options, WRITE_DEFAULTS) 485 486 if columns.is_a? Array 487 if sub_column 488 raise ArgumentError, 'remove does not support sub_columns with array of columns' 489 end 490 else 491 columns = [columns] 492 end 493 494 timestamp = options[:timestamp]|| Time.stamp 495 496 mutation_map = 497 { 498 key => { 499 column_family => columns.map {|column| 500 _delete_mutation(column_family, column, sub_column, timestamp) 501 } 502 } 503 } 504 505 mutation = [mutation_map, options[:consistency]] 506 507 @batch ? @batch << mutation : _mutate(*mutation) 508 end
Rename a column family. Returns the new schema id.
-
old_name - The current column_family name.
-
new_name - The desired column_family name.
# File lib/cassandra/cassandra.rb 304 def rename_column_family(old_name, new_name) 305 return false if Cassandra.VERSION.to_f != 0.7 306 307 begin 308 res = client.system_rename_column_family(old_name, new_name) 309 rescue CassandraThrift::TimedOutException => te 310 puts "Timed out: #{te.inspect}" 311 end 312 @schema = nil 313 res 314 end
Renames keyspace.
-
old_name - Current keyspace name.
-
new_name - Desired keyspace name.
Returns the new schema id
# File lib/cassandra/cassandra.rb 377 def rename_keyspace(old_name, new_name) 378 return false if Cassandra.VERSION.to_f < 0.7 379 380 begin 381 res = client.system_rename_keyspace(old_name, new_name) 382 rescue CassandraThrift::TimedOutException => toe 383 puts "Timed out: #{toe.inspect}" 384 rescue Thrift::TransportException => te 385 puts "Timed out: #{te.inspect}" 386 end 387 keyspace = new_name if old_name.eql?(@keyspace) 388 @keyspaces = nil 389 res 390 end
Returns an array of CassandraThrift::TokenRange objects indicating which servers make up the current ring. What their start and end tokens are, and their list of endpoints.
Please note that this only works on version 0.7.0 and higher.
# File lib/cassandra/cassandra.rb 220 def ring 221 return false if Cassandra.VERSION.to_f < 0.7 222 223 client.describe_ring(@keyspace) 224 end
This returns true if all servers are in agreement on the schema.
Please note that this only works on version 0.7.0 and higher.
# File lib/cassandra/cassandra.rb 188 def schema_agreement? 189 return false if Cassandra.VERSION.to_f < 0.7 190 191 client.describe_schema_versions().length == 1 192 end
Update the column family based on the passed in definition.
# File lib/cassandra/cassandra.rb 319 def update_column_family(cf_def) 320 return false if Cassandra.VERSION.to_f < 0.7 321 322 begin 323 res = client.system_update_column_family(cf_def) 324 rescue CassandraThrift::TimedOutException => te 325 puts "Timed out: #{te.inspect}" 326 end 327 @schema = nil 328 res 329 end
Update the keyspace using the passed in keyspace definition.
# File lib/cassandra/cassandra.rb 395 def update_keyspace(ks_def) 396 return false if Cassandra.VERSION.to_f < 0.7 397 398 begin 399 res = client.system_update_keyspace(ks_def) 400 rescue CassandraThrift::TimedOutException => toe 401 puts "Timed out: #{toe.inspect}" 402 rescue Thrift::TransportException => te 403 puts "Timed out: #{te.inspect}" 404 end 405 @keyspaces = nil 406 res 407 end
Lists the current cassandra.thrift version.
Please note that this only works on version 0.7.0 and higher.
# File lib/cassandra/cassandra.rb 198 def version 199 return false if Cassandra.VERSION.to_f < 0.7 200 201 client.describe_version() 202 end
Protected Instance Methods
# File lib/cassandra/0.6/cassandra.rb 98 def all_nodes 99 if @auto_discover_nodes 100 temp_client = new_client 101 begin 102 ips = ::JSON.parse(temp_client.get_string_property('token map')).values 103 port = @servers.first.split(':').last 104 ips.map{|ip| "#{ip}:#{port}" } 105 ensure 106 temp_client.disconnect! 107 end 108 else 109 @servers 110 end 111 end
# File lib/cassandra/cassandra.rb 1013 def calling_method 1014 "#{self.class}##{caller[0].split('`').last[0..-3]}" 1015 end
# File lib/cassandra/0.6/cassandra.rb 88 def client 89 reconnect! if @client.nil? 90 @client 91 end
Roll up queued mutations, to improve atomicity (and performance).
# File lib/cassandra/cassandra.rb 1020 def compact_mutations! 1021 used_clevels = {} # hash that lists the consistency levels seen in the batch array. key is the clevel, value is true 1022 by_key = Hash.new{|h,k | h[k] = {}} 1023 # @batch is an array of mutation_ops. 1024 # A mutation op is a 2-item array containing [mutationmap, consistency_number] 1025 # a mutation map is a hash, by key (string) that has a hash by CF name, containing a list of column_mutations) 1026 @batch.each do |mutation_op| 1027 # A single mutation op looks like: 1028 # For an insert/update 1029 #[ { key1 => 1030 # { CF1 => [several of CassThrift:Mutation(colname,value,TS,ttl)] 1031 # CF2 => [several mutations] 1032 # }, 1033 # key2 => {...} # Not sure if they can come batched like this...so there might only be a single key (and CF) 1034 # }, # [0] 1035 # consistency # [1] 1036 #] 1037 mmap = mutation_op[0] # :remove OR a hash like {"key"=> {"CF"=>[mutationclass1,...] } } 1038 used_clevels[mutation_op[1]] = true #save the clevel required for this operation 1039 1040 mmap.keys.each do |k| 1041 mmap[k].keys.each do |cf| # For each CF in that key 1042 by_key[k][cf] ||= [] 1043 by_key[k][cf].concat(mmap[k][cf]) # Append the list of mutations for that key and CF 1044 end 1045 end 1046 end 1047 # Returns the batch mutations map, and an array with the consistency levels 'seen' in the batch 1048 [by_key, used_clevels.keys] 1049 end
Creates a new client as specified by Cassandra.thrift_client_options[:thrift_client_class]
# File lib/cassandra/cassandra.rb 1054 def new_client 1055 thrift_client_class.new(CassandraThrift::Cassandra::Client, @servers, @thrift_client_options) 1056 end
# File lib/cassandra/0.6/cassandra.rb 93 def reconnect! 94 @servers = all_nodes 95 @client = new_client 96 end
# File lib/cassandra/0.6/cassandra.rb 80 def schema(load=true) 81 if !load && !@schema 82 [] 83 else 84 @schema ||= client.describe_keyspace(@keyspace) 85 end 86 end