class Cassie

This class provides a lightweight wrapper around the Cassandra driver. It provides a foundation for maintaining a connection and constructing CQL statements.

Attributes

logger[W]

Set a logger with a Logger compatible object.

config[R]
consistency[RW]
subscribers[R]

Public Class Methods

configure!(options) click to toggle source

Call this method to load the Cassie::Config from the specified file for the specified environment.

# File lib/cassie.rb, line 55
def configure!(options)
  if defined?(@instance) && @instance
    old_instance = @instance
    @instance = nil
    old_instance.disconnect
  end
  @config = Cassie::Config.new(options)
end
consistency(level) { || ... } click to toggle source

This method can be used to set a consistency level for all Cassandra queries within a block that don't explicitly define them. It can be used where consistency is important (i.e. on validation queries) but where a higher level method doesn't provide an option to set it.

# File lib/cassie.rb, line 68
def consistency(level)
  save_val = Thread.current[:cassie_consistency]
  begin
    Thread.current[:cassie_consistency] = level
    yield
  ensure
    Thread.current[:cassie_consistency] = save_val
  end
end
instance() click to toggle source

A singleton instance that can be shared to communicate with a Cassandra cluster.

# File lib/cassie.rb, line 45
def instance
  unless defined?(@instance) && @instance
    instance = new(@config)
    @instance = instance
  end
  @instance
end
logger() click to toggle source

Get a Logger compatible object if it has been set.

# File lib/cassie.rb, line 79
def logger
  @logger if defined?(@logger)
end
new(config) click to toggle source
# File lib/cassie.rb, line 87
def initialize(config)
  @config = config
  @monitor = Monitor.new
  @session = nil
  @prepared_statements = {}
  @last_prepare_warning = Time.now
  @subscribers = Subscribers.new
  @consistency = ((config.cluster || {})[:consistency] || :local_one)
end

Public Instance Methods

batch(options = nil) { || ... } click to toggle source

Declare and execute a batch statement. Any insert, update, or delete calls made within the block will add themselves to the batch which is executed at the end of the block.

# File lib/cassie.rb, line 166
def batch(options = nil)
  if Thread.current[:cassie_batch]
    yield
  else
    begin
      batch = []
      Thread.current[:cassie_batch] = batch
      yield
      unless batch.empty?
        batch_statement = session.logged_batch
        batch.each do |cql, values|
          if values.blank?
            batch_statement.add(cql)
          else
            statement = prepare(cql)
            statement = statement.bind(Array(values)) if values.present?
            batch_statement.add(statement)
          end
        end
        execute(batch_statement, nil, options)
      end
    ensure
      Thread.current[:cassie_batch] = nil
    end
  end
end
connect() click to toggle source

Open a connection to the Cassandra cluster.

# File lib/cassie.rb, line 98
def connect
  start_time = Time.now
  cluster_config = config.cluster
  cluster_config = cluster_config.merge(logger: logger) if logger
  cluster = Cassandra.cluster(cluster_config)
  logger&.info("Cassie.connect with #{config.sanitized_cluster} in #{((Time.now - start_time) * 1000).round}ms")
  @monitor.synchronize do
    @session = cluster.connect(config.default_keyspace)
    @prepared_statements = {}
  end
end
connected?() click to toggle source

Return true if the connection to the Cassandra cluster has been established.

# File lib/cassie.rb, line 121
def connected?
  !!@session
end
current_consistency() click to toggle source

Return the current consistency level that has been set for statements.

# File lib/cassie.rb, line 322
def current_consistency
  Thread.current[:cassie_consistency] || consistency
end
delete(table, key_hash, options = nil) click to toggle source

Delete a row from a table. You should pass the primary key value in the key_hash.

If this method is called inside a batch block it will be executed in the batch.

# File lib/cassie.rb, line 272
def delete(table, key_hash, options = nil)
  key_cql, key_values = key_clause(key_hash)
  cql = "DELETE FROM #{table} WHERE #{key_cql}"
  batch_or_execute(cql, key_values, options)
end
disconnect() click to toggle source

Close the connections to the Cassandra cluster.

# File lib/cassie.rb, line 111
def disconnect
  logger&.info("Cassie.disconnect from #{config.sanitized_cluster}")
  @monitor.synchronize do
    @session&.close
    @session = nil
    @prepared_statements = {}
  end
end
execute(cql, values = nil, options = nil) click to toggle source

Execute an arbitrary CQL statment. If values are passed and the statement is a string, it will be prepared and executed as a prepared statement.

# File lib/cassie.rb, line 280
def execute(cql, values = nil, options = nil)
  start_time = Time.now
  begin
    statement = nil
    statement = if cql.is_a?(String)
      if values.present?
        prepare(cql)
      else
        Cassandra::Statements::Simple.new(cql)
      end
    else
      cql
    end

    if values.present?
      values = Array(values)
      options = (options ? options.merge(arguments: values) : {arguments: values})
    end

    # Set a default consistency from a block context if it isn't explicitly set.
    statement_consistency = current_consistency
    if statement_consistency
      if options
        options = options.merge(consistency: statement_consistency) if options[:consistency].nil?
      else
        options = {consistency: statement_consistency}
      end
    end

    session.execute(statement, options || {})
  rescue Cassandra::Errors::IOError => e
    disconnect
    raise e
  ensure
    if statement.is_a?(Cassandra::Statement) && !subscribers.empty?
      payload = Message.new(statement, options, Time.now - start_time)
      subscribers.each { |subscriber| subscriber.call(payload) }
    end
  end
end
find(cql, values = nil, options = nil) click to toggle source

Find rows using the CQL statement. If the statement is a string and values are provided then the statement will executed as a prepared statement. In general all statements should be executed this way.

If you have a statement without arguments, then you should call prepare before and pass the prepared statement if you plan on executing the same query multiple times.

# File lib/cassie.rb, line 200
def find(cql, values = nil, options = nil)
  execute(cql, values, options)
end
insert(table, values_hash, options = nil) click to toggle source

Insert a row from a hash into a table.

You can specify a ttl for the created row by supplying a :ttl option.

If this method is called inside a batch block it will be executed in the batch.

# File lib/cassie.rb, line 209
def insert(table, values_hash, options = nil)
  columns = []
  values = []
  values_hash.each do |column, value|
    unless value.nil?
      columns << column
      values << value
    end
  end
  cql = "INSERT INTO #{table} (#{columns.join(", ")}) VALUES (#{question_marks(columns.size)})"

  if options&.include?(:ttl)
    options = options.dup
    ttl = options.delete(:ttl)
    if ttl
      cql += " USING TTL ?"
      values << Integer(ttl)
    end
  end

  batch_or_execute(cql, values, options)
end
prepare(cql) click to toggle source

Prepare a CQL statement for repeate execution. Prepared statements are cached on the driver until the connection is closed. Calling prepare multiple times with the same CQL string will return the prepared statement from a cache.

# File lib/cassie.rb, line 136
def prepare(cql)
  raise ArgumentError.new("CQL must be a string") unless cql.is_a?(String)
  statement = @prepared_statements[cql]
  cache_filled_up = false
  unless statement
    @monitor.synchronize do
      statement = session.prepare(cql)
      @prepared_statements[cql] = statement
      if @prepared_statements.size > config.max_prepared_statements
        # Cache is full. Clear out the oldest values. Ideally we'd remove the least recently used,
        # but that would require additional overhead on each query. This method will eventually
        # keep the most active queries in the cache and is overall more efficient.
        @prepared_statements.delete(@prepared_statements.first[0])
        cache_filled_up = true
      end
    end
  end

  if cache_filled_up && logger && Time.now > @last_prepare_warning + 10
    # Set a throttle on how often this message is logged so we don't kill performance enven more.
    @last_prepare_warning = Time.now
    logger.warn("Cassie.prepare cache filled up. Consider increasing the size from #{config.max_prepared_statements}.")
  end

  statement
end
reconnect() click to toggle source

Force reconnection. If you're using this code in conjunction in a forking server environment like passenger or unicorn you should call this method after forking.

# File lib/cassie.rb, line 127
def reconnect
  disconnect
  connect
end
update(table, values_hash, key_hash, options = nil) click to toggle source

Update a row in a table. The values to update should be passed in the values_hash while the primary key should be passed in the key_hash.

You can specify a ttl for the created row by supplying a :ttl option.

If this method is called inside a batch block it will be executed in the batch.

# File lib/cassie.rb, line 238
def update(table, values_hash, key_hash, options = nil)
  key_cql, key_values = key_clause(key_hash)
  update_cql = []
  update_values = []
  if values_hash.is_a?(String)
    update_cql << values_hash
  else
    values_hash.each do |column, value|
      update_cql << "#{column} = ?"
      update_values << value
    end
  end
  values = update_values + key_values

  cql = "UPDATE #{table}"

  if options&.include?(:ttl)
    options = options.dup
    ttl = options.delete(:ttl)
    if ttl
      cql += " USING TTL ?"
      values.unshift(Integer(ttl))
    end
  end

  cql += " SET #{update_cql.join(", ")} WHERE #{key_cql}"

  batch_or_execute(cql, values, options)
end

Private Instance Methods

batch_or_execute(cql, values, options = nil) click to toggle source
# File lib/cassie.rb, line 337
def batch_or_execute(cql, values, options = nil)
  batch = Thread.current[:cassie_batch]
  if batch
    batch << [cql, values]
    nil
  else
    execute(cql, values, options)
  end
end
key_clause(key_hash) click to toggle source
# File lib/cassie.rb, line 351
def key_clause(key_hash)
  cql = []
  values = []
  key_hash.each do |key, value|
    cql << "#{key} = ?"
    values << value
  end
  [cql.join(" AND "), values]
end
logger() click to toggle source
# File lib/cassie.rb, line 328
def logger
  self.class.logger
end
question_marks(size) click to toggle source
# File lib/cassie.rb, line 347
def question_marks(size)
  "?#{",?" * (size - 1)}"
end
session() click to toggle source
# File lib/cassie.rb, line 332
def session
  connect unless connected?
  @session
end
statement_cql(statement, previous = nil) click to toggle source

Extract the CQL from a statement

# File lib/cassie.rb, line 362
def statement_cql(statement, previous = nil)
  cql = nil
  if statement.respond_to?(:cql)
    cql = statement.cql
  elsif statement.respond_to?(:statements) && (previous.nil? || !previous.include?(statement))
    previous ||= []
    previous << statement
    cql = statement.statements.collect { |s| statement_cql(s, previous) }.join("; ")
  end
  cql
end