class Cassie
This class provides a lightweight wrapper around the Cassandra driver. It provides a foundation for maintaining a connection and constructing CQL statements.
Attributes
Set a logger with a Logger compatible object.
Public Class Methods
Call this method to load the Cassie::Config
from the specified file for the specified environment.
# File lib/cassie.rb, line 55 def configure!(options) if defined?(@instance) && @instance old_instance = @instance @instance = nil old_instance.disconnect end @config = Cassie::Config.new(options) end
This method can be used to set a consistency level for all Cassandra queries within a block that don't explicitly define them. It can be used where consistency is important (i.e. on validation queries) but where a higher level method doesn't provide an option to set it.
# File lib/cassie.rb, line 68 def consistency(level) save_val = Thread.current[:cassie_consistency] begin Thread.current[:cassie_consistency] = level yield ensure Thread.current[:cassie_consistency] = save_val end end
A singleton instance that can be shared to communicate with a Cassandra cluster.
# File lib/cassie.rb, line 45 def instance unless defined?(@instance) && @instance instance = new(@config) @instance = instance end @instance end
Get a Logger compatible object if it has been set.
# File lib/cassie.rb, line 79 def logger @logger if defined?(@logger) end
# File lib/cassie.rb, line 87 def initialize(config) @config = config @monitor = Monitor.new @session = nil @prepared_statements = {} @last_prepare_warning = Time.now @subscribers = Subscribers.new @consistency = ((config.cluster || {})[:consistency] || :local_one) end
Public Instance Methods
Declare and execute a batch statement. Any insert, update, or delete calls made within the block will add themselves to the batch which is executed at the end of the block.
# File lib/cassie.rb, line 166 def batch(options = nil) if Thread.current[:cassie_batch] yield else begin batch = [] Thread.current[:cassie_batch] = batch yield unless batch.empty? batch_statement = session.logged_batch batch.each do |cql, values| if values.blank? batch_statement.add(cql) else statement = prepare(cql) statement = statement.bind(Array(values)) if values.present? batch_statement.add(statement) end end execute(batch_statement, nil, options) end ensure Thread.current[:cassie_batch] = nil end end end
Open a connection to the Cassandra cluster.
# File lib/cassie.rb, line 98 def connect start_time = Time.now cluster_config = config.cluster cluster_config = cluster_config.merge(logger: logger) if logger cluster = Cassandra.cluster(cluster_config) logger&.info("Cassie.connect with #{config.sanitized_cluster} in #{((Time.now - start_time) * 1000).round}ms") @monitor.synchronize do @session = cluster.connect(config.default_keyspace) @prepared_statements = {} end end
Return true if the connection to the Cassandra cluster has been established.
# File lib/cassie.rb, line 121 def connected? !!@session end
Return the current consistency level that has been set for statements.
# File lib/cassie.rb, line 322 def current_consistency Thread.current[:cassie_consistency] || consistency end
Delete a row from a table. You should pass the primary key value in the key_hash.
If this method is called inside a batch block it will be executed in the batch.
# File lib/cassie.rb, line 272 def delete(table, key_hash, options = nil) key_cql, key_values = key_clause(key_hash) cql = "DELETE FROM #{table} WHERE #{key_cql}" batch_or_execute(cql, key_values, options) end
Close the connections to the Cassandra cluster.
# File lib/cassie.rb, line 111 def disconnect logger&.info("Cassie.disconnect from #{config.sanitized_cluster}") @monitor.synchronize do @session&.close @session = nil @prepared_statements = {} end end
Execute an arbitrary CQL statment. If values are passed and the statement is a string, it will be prepared and executed as a prepared statement.
# File lib/cassie.rb, line 280 def execute(cql, values = nil, options = nil) start_time = Time.now begin statement = nil statement = if cql.is_a?(String) if values.present? prepare(cql) else Cassandra::Statements::Simple.new(cql) end else cql end if values.present? values = Array(values) options = (options ? options.merge(arguments: values) : {arguments: values}) end # Set a default consistency from a block context if it isn't explicitly set. statement_consistency = current_consistency if statement_consistency if options options = options.merge(consistency: statement_consistency) if options[:consistency].nil? else options = {consistency: statement_consistency} end end session.execute(statement, options || {}) rescue Cassandra::Errors::IOError => e disconnect raise e ensure if statement.is_a?(Cassandra::Statement) && !subscribers.empty? payload = Message.new(statement, options, Time.now - start_time) subscribers.each { |subscriber| subscriber.call(payload) } end end end
Find rows using the CQL statement. If the statement is a string and values are provided then the statement will executed as a prepared statement. In general all statements should be executed this way.
If you have a statement without arguments, then you should call prepare before and pass the prepared statement if you plan on executing the same query multiple times.
# File lib/cassie.rb, line 200 def find(cql, values = nil, options = nil) execute(cql, values, options) end
Insert a row from a hash into a table.
You can specify a ttl for the created row by supplying a :ttl option.
If this method is called inside a batch block it will be executed in the batch.
# File lib/cassie.rb, line 209 def insert(table, values_hash, options = nil) columns = [] values = [] values_hash.each do |column, value| unless value.nil? columns << column values << value end end cql = "INSERT INTO #{table} (#{columns.join(", ")}) VALUES (#{question_marks(columns.size)})" if options&.include?(:ttl) options = options.dup ttl = options.delete(:ttl) if ttl cql += " USING TTL ?" values << Integer(ttl) end end batch_or_execute(cql, values, options) end
Prepare a CQL statement for repeate execution. Prepared statements are cached on the driver until the connection is closed. Calling prepare multiple times with the same CQL string will return the prepared statement from a cache.
# File lib/cassie.rb, line 136 def prepare(cql) raise ArgumentError.new("CQL must be a string") unless cql.is_a?(String) statement = @prepared_statements[cql] cache_filled_up = false unless statement @monitor.synchronize do statement = session.prepare(cql) @prepared_statements[cql] = statement if @prepared_statements.size > config.max_prepared_statements # Cache is full. Clear out the oldest values. Ideally we'd remove the least recently used, # but that would require additional overhead on each query. This method will eventually # keep the most active queries in the cache and is overall more efficient. @prepared_statements.delete(@prepared_statements.first[0]) cache_filled_up = true end end end if cache_filled_up && logger && Time.now > @last_prepare_warning + 10 # Set a throttle on how often this message is logged so we don't kill performance enven more. @last_prepare_warning = Time.now logger.warn("Cassie.prepare cache filled up. Consider increasing the size from #{config.max_prepared_statements}.") end statement end
Force reconnection. If you're using this code in conjunction in a forking server environment like passenger or unicorn you should call this method after forking.
# File lib/cassie.rb, line 127 def reconnect disconnect connect end
Update a row in a table. The values to update should be passed in the values_hash while the primary key should be passed in the key_hash.
You can specify a ttl for the created row by supplying a :ttl option.
If this method is called inside a batch block it will be executed in the batch.
# File lib/cassie.rb, line 238 def update(table, values_hash, key_hash, options = nil) key_cql, key_values = key_clause(key_hash) update_cql = [] update_values = [] if values_hash.is_a?(String) update_cql << values_hash else values_hash.each do |column, value| update_cql << "#{column} = ?" update_values << value end end values = update_values + key_values cql = "UPDATE #{table}" if options&.include?(:ttl) options = options.dup ttl = options.delete(:ttl) if ttl cql += " USING TTL ?" values.unshift(Integer(ttl)) end end cql += " SET #{update_cql.join(", ")} WHERE #{key_cql}" batch_or_execute(cql, values, options) end
Private Instance Methods
# File lib/cassie.rb, line 337 def batch_or_execute(cql, values, options = nil) batch = Thread.current[:cassie_batch] if batch batch << [cql, values] nil else execute(cql, values, options) end end
# File lib/cassie.rb, line 351 def key_clause(key_hash) cql = [] values = [] key_hash.each do |key, value| cql << "#{key} = ?" values << value end [cql.join(" AND "), values] end
# File lib/cassie.rb, line 328 def logger self.class.logger end
# File lib/cassie.rb, line 347 def question_marks(size) "?#{",?" * (size - 1)}" end
# File lib/cassie.rb, line 332 def session connect unless connected? @session end
Extract the CQL from a statement
# File lib/cassie.rb, line 362 def statement_cql(statement, previous = nil) cql = nil if statement.respond_to?(:cql) cql = statement.cql elsif statement.respond_to?(:statements) && (previous.nil? || !previous.include?(statement)) previous ||= [] previous << statement cql = statement.statements.collect { |s| statement_cql(s, previous) }.join("; ") end cql end