module CassandraHelpers

Public Instance Methods

create_cluster(hosts, port = 9042) click to toggle source
# File lib/cassandra-helpers.rb, line 14
def create_cluster(hosts, port = 9042)
  Cassandra.cluster(hosts: hosts, port: port)
end
create_session(cluster, keyspace_name) click to toggle source
# File lib/cassandra-helpers.rb, line 22
def create_session(cluster, keyspace_name)
  cluster.connect(keyspace_name)
end
delete_record(session, table, record) click to toggle source
# File lib/cassandra-helpers.rb, line 38
def delete_record(session, table, record)
  cql = "DELETE FROM #{Cassandra::Util.escape_name(table.name)} WHERE "
  cql += table.primary_key.map { |key| "#{Cassandra::Util.escape_name(key.name)} = #{Cassandra::Util.encode_object(record[key.name])}" }.join(' AND ')
  execute_query(session,  cql)
end
delete_records(session, table, records) click to toggle source
# File lib/cassandra-helpers.rb, line 44
def delete_records(session, table, records)
  records.each do |record|
    delete_record(session, table, record)
  end
end
each_record(session, cql, page_size = 100) { |row| ... } click to toggle source
# File lib/cassandra-helpers.rb, line 56
def each_record(session, cql, page_size = 100)
  result = execute_query(session, cql, page_size: page_size)
  loop do
    result.each { |row| yield row }
    break if result.last_page?

    result = result.next_page
  end
end
execute_query(session, cql, options = {}) click to toggle source
# File lib/cassandra-helpers.rb, line 30
def execute_query(session, cql, options = {})
  started_at = Time.now
  result = session.execute(cql, options)
  took = '%.2f' % (Time.now - started_at)
  logger.debug("KS [#{took}] #{cql}") if logger && logger.debug?
  result
end
find_record(session, table, keys) click to toggle source
# File lib/cassandra-helpers.rb, line 50
def find_record(session, table, keys)
  cql = "SELECT * FROM #{Cassandra::Util.escape_name(table.name)} WHERE "
  cql += table.primary_key.map { |key| keys[key.name] ? "#{Cassandra::Util.escape_name(key.name)} = #{Cassandra::Util.encode_object(keys[key.name])}" : nil }.compact.join(' AND ')
  execute_query(session,  cql)
end
get_keyspace(cluster, name) click to toggle source
# File lib/cassandra-helpers.rb, line 18
def get_keyspace(cluster, name)
  cluster.keyspaces.detect { |keyspace| keyspace.name == name }
end
get_table(keyspace, table_name) click to toggle source
# File lib/cassandra-helpers.rb, line 26
def get_table(keyspace, table_name)
  keyspace.tables.detect { |table| table.name == table_name }
end
logger() click to toggle source
# File lib/cassandra-helpers.rb, line 6
def logger
  @logger
end
logger=(logger) click to toggle source
# File lib/cassandra-helpers.rb, line 10
def logger=(logger)
  @logger = logger
end
retry(retries = 10, sleep_times_between_retries = nil) { || ... } click to toggle source
# File lib/cassandra-helpers.rb, line 66
def retry(retries = 10, sleep_times_between_retries = nil)
  max_retries = retries
  sleep_times_between_retries = [sleep_times_between_retries] if sleep_times_between_retries.is_a?(Numeric)

  begin
    yield
  rescue Cassandra::Errors::TimeoutError => e
    logger.error(e) if logger

    sleep_time =
      if sleep_times_between_retries
        sleep_times_between_retries[max_retries - retries] || sleep_times_between_retries.last
      end

    retries -= 1
    if retries <= 0
      logger.error('no more retries') if logger
      raise(e)
    end

    logger.warn("retrying, remaining attempts #{retries}, sleeping for #{sleep_time.to_f} seconds") if logger
    sleep(sleep_time) if sleep_time

    retry
  end
end