class Moneta::Adapters::Cassandra

Cassandra backend @api public

Public Class Methods

new(options = {}) click to toggle source

@param [Hash] options @option options [String] :keyspace (‘moneta’) Cassandra keyspace @option options [String] :table (‘moneta’) Cassandra table @option options [String] :host (‘127.0.0.1’) Server host name @option options [Integer] :port (9160) Server port @option options [Integer] :expires Default expiration time @option options [String] :key_column (‘key’) Name of the key column @option options [String] :value_column (‘value’) Name of the value

column

@option options [String] :updated_column (‘updated_at’) Name of the

column used to track last update

@option options [String] :expired_column (‘expired’) Name of the column

used to track expiry

@option options [Symbol] :read_consistency (:all) Default read

consistency

@option options [Symbol] :write_consistency (:all) Default write

consistency

@option options [Proc, Boolean, Hash] :create_keyspace Provide a proc

for creating the keyspace, or a Hash of options to use when creating
it, or set to false to disable.  The Proc will only be called if the
keyspace does not already exist.

@option options [::Cassandra::Cluster] :cluster Existing cluster to use @option options [::Cassandra::Session] :backend Existing session to use @option options Other options passed to ‘Cassandra#cluster`

Calls superclass method Moneta::Adapter::new
# File lib/moneta/adapters/cassandra.rb, line 59
      def initialize(options = {})
        super

        backend.execute <<-CQL
          CREATE TABLE IF NOT EXISTS #{config.table} (
            #{config.key_column} blob,
            #{config.value_column} blob,
            #{config.updated_column} timeuuid,
            #{config.expired_column} boolean,
            PRIMARY KEY (#{config.key_column}, #{config.updated_column})
          )
        CQL

        prepare_statements
      end

Public Instance Methods

clear(options = {}) click to toggle source

(see Proxy#clear)

# File lib/moneta/adapters/cassandra.rb, line 141
def clear(options = {})
  backend.execute(@clear)
  self
end
close() click to toggle source

(see Proxy#close)

# File lib/moneta/adapters/cassandra.rb, line 147
def close
  backend.close_async
  @backend = nil
  if @own_cluster
    @own_cluster.close_async
    @own_cluster = nil
  end
  nil
end
delete(key, options = {}) click to toggle source

(see Proxy#delete)

# File lib/moneta/adapters/cassandra.rb, line 131
def delete(key, options = {})
  rc, wc = consistency(options)
  result = backend.execute(@delete_value, options.merge(consistency: rc, arguments: [key]))
  if row = result.first and row[config.expired_column] != nil
    backend.execute(@delete, options.merge(consistency: wc, arguments: [timestamp, key, row[config.updated_column]]))
    row[config.value_column]
  end
end
each_key() { |row| ... } click to toggle source

(see Proxy#each_key)

# File lib/moneta/adapters/cassandra.rb, line 158
def each_key
  rc, = consistency
  return enum_for(:each_key) unless block_given?
  result = backend.execute(@each_key, consistency: rc, page_size: 100)
  loop do
    result.each do |row|
      next if row[config.expired_column] == nil
      yield row[config.key_column]
    end

    break if result.last_page?
    result = result.next_page
  end
  self
end
fetch_values(*keys, **options) { |key| ... } click to toggle source

(see Proxy#fetch_values)

# File lib/moneta/adapters/cassandra.rb, line 210
def fetch_values(*keys, **options)
  return values_at(*keys, **options) unless block_given?
  hash = Hash[slice(*keys, **options)]
  keys.map do |key|
    if hash.key?(key)
      hash[key]
    else
      yield key
    end
  end
end
key?(key, options = {}) click to toggle source

(see Proxy#key?)

# File lib/moneta/adapters/cassandra.rb, line 76
def key?(key, options = {})
  rc, wc = consistency(options)
  if (expires = expires_value(options, nil)) != nil
    # Because Cassandra expires each value in a column, rather than the
    # whole column, when we want to update the expiry we load the value
    # and then re-set it in order to update the TTL.
    return false unless
      row = backend.execute(@load, options.merge(consistency: rc, arguments: [key])).first and
        row[config.expired_column] != nil
    backend.execute(@update_expires,
                    options.merge(consistency: wc,
                                  arguments: [(expires || 0).to_i,
                                              timestamp,
                                              row[config.value_column],
                                              key,
                                              row[config.updated_column]]))
    true
  elsif row = backend.execute(@key, options.merge(consistency: rc, arguments: [key])).first
    row[config.expired_column] != nil
  else
    false
  end
end
load(key, options = {}) click to toggle source

(see Proxy#load)

# File lib/moneta/adapters/cassandra.rb, line 101
def load(key, options = {})
  rc, wc = consistency(options)
  if row = backend.execute(@load, options.merge(consistency: rc, arguments: [key])).first and row[config.expired_column] != nil
    if (expires = expires_value(options, nil)) != nil
      backend.execute(@update_expires,
                      options.merge(consistency: wc,
                                    arguments: [(expires || 0).to_i,
                                                timestamp,
                                                row[config.value_column],
                                                key,
                                                row[config.updated_column]]))
    end
    row[config.value_column]
  end
end
merge!(pairs, options = {}) { |key, existing, new_value| ... } click to toggle source

(see Proxy#merge!)

# File lib/moneta/adapters/cassandra.rb, line 223
def merge!(pairs, options = {})
  keys = pairs.map { |k, _| k }.to_a
  return self if keys.empty?

  if block_given?
    existing = Hash[slice(*keys, **options)]
    pairs = pairs.map do |key, new_value|
      if existing.key?(key)
        [key, yield(key, existing[key], new_value)]
      else
        [key, new_value]
      end
    end
  end

  _rc, wc = consistency(options)
  expires = expires_value(options)
  t = timestamp
  batch = backend.batch do |batch|
    batch.add(@merge_delete, arguments: [t, keys])
    pairs.each do |key, value|
      batch.add(@store, arguments: [key, value, (expires || 0).to_i, t + 1])
    end
  end
  backend.execute(batch, options.merge(consistency: wc))

  self
end
slice(*keys, **options) click to toggle source

(see Proxy#slice)

# File lib/moneta/adapters/cassandra.rb, line 175
def slice(*keys, **options)
  rc, wc = consistency(options)
  result = backend.execute(@slice, options.merge(consistency: rc, arguments: [keys]))
  expires = expires_value(options, nil)
  updated = [] if expires != nil
  pairs = result.map do |row|
    next if row[config.expired_column] == nil
    if expires != nil
      updated << [row[config.key_column], row[config.value_column], row[config.updated_column]]
    end
    [row[config.key_column], row[config.value_column]]
  end.compact

  if expires != nil && !updated.empty?
    ttl = (expires || 0).to_i
    t = timestamp
    batch = backend.batch do |batch|
      updated.each do |key, value, updated|
        batch.add(@update_expires, arguments: [ttl, t, value, key, updated])
      end
    end

    backend.execute(batch, options.merge(consistency: wc))
  end

  pairs
end
store(key, value, options = {}) click to toggle source

(see Proxy#store)

# File lib/moneta/adapters/cassandra.rb, line 118
def store(key, value, options = {})
  _, wc = consistency(options)
  expires = expires_value(options)
  t = timestamp
  batch = backend.batch do |batch|
    batch.add(@store_delete, arguments: [t, key])
    batch.add(@store, arguments: [key, value, (expires || 0).to_i, t + 1])
  end
  backend.execute(batch, options.merge(consistency: wc))
  value
end
values_at(*keys, **options) click to toggle source

(see Proxy#values_at)

# File lib/moneta/adapters/cassandra.rb, line 204
def values_at(*keys, **options)
  hash = Hash[slice(*keys, **options)]
  keys.map { |key| hash[key] }
end

Private Instance Methods

consistency(options = {}) click to toggle source
# File lib/moneta/adapters/cassandra.rb, line 347
def consistency(options = {})
  [
    options[:read_consistency] || config.read_consistency,
    options[:write_consistency] || config.write_consistency
  ]
end
create_keyspace(backend, keyspace, create_keyspace) click to toggle source
# File lib/moneta/adapters/cassandra.rb, line 258
def create_keyspace(backend, keyspace, create_keyspace)
  options = {
    replication: {
      class: 'SimpleStrategy',
      replication_factor: 1
    }
  }

  case create_keyspace
  when Proc
    return create_keyspace.call(keyspace)
  when false
    return
  when Hash
    options.merge!(create_keyspace)
  end

  # This is a bit hacky, but works.  Options in Cassandra look like JSON,
  # but use single quotes instead of double-quotes.
  require 'multi_json'
  option_str = options.map do |key, value|
    key.to_s + ' = ' + MultiJson.dump(value).tr(?", ?')
  end.join(' AND ')

  backend.execute "CREATE KEYSPACE IF NOT EXISTS %<keyspace>s WITH %<options>s" % {
    keyspace: keyspace,
    options: option_str
  }
rescue ::Cassandra::Errors::TimeoutError
  tries ||= 0
  (tries += 1) <= 3 ? retry : raise
end
prepare_statements() click to toggle source
# File lib/moneta/adapters/cassandra.rb, line 291
      def prepare_statements
        @key = backend.prepare(<<-CQL)
          SELECT #{config.updated_column}, #{config.expired_column}
          FROM #{config.table} WHERE #{config.key_column} = ?
          LIMIT 1
        CQL
        @store_delete = backend.prepare(<<-CQL)
          DELETE FROM #{config.table}
          USING TIMESTAMP ?
          WHERE #{config.key_column} = ?
        CQL
        @store = backend.prepare(<<-CQL)
          INSERT INTO #{config.table} (#{config.key_column}, #{config.value_column}, #{config.updated_column}, #{config.expired_column})
          VALUES (?, ?, now(), false)
          USING TTL ? AND TIMESTAMP ?
        CQL
        @load = backend.prepare(<<-CQL)
          SELECT #{config.value_column}, #{config.updated_column}, #{config.expired_column}
          FROM #{config.table}
          WHERE #{config.key_column} = ?
          LIMIT 1
        CQL
        @update_expires = backend.prepare(<<-CQL)
          UPDATE #{config.table}
          USING TTL ? AND TIMESTAMP ?
          SET #{config.value_column} = ?, #{config.expired_column} = false
          WHERE #{config.key_column} = ? AND #{config.updated_column} = ?
        CQL
        @clear = backend.prepare("TRUNCATE #{config.table}")
        @delete_value = backend.prepare(<<-CQL)
          SELECT #{config.value_column}, #{config.updated_column}, #{config.expired_column}
          FROM #{config.table}
          WHERE #{config.key_column} = ?
          LIMIT 1
        CQL
        @delete = backend.prepare(<<-CQL, idempotent: true)
          DELETE FROM #{config.table}
          USING TIMESTAMP ?
          WHERE #{config.key_column} = ? AND #{config.updated_column} = ?
        CQL
        @each_key = backend.prepare(<<-CQL)
          SELECT #{config.key_column}, #{config.expired_column}
          FROM #{config.table}
        CQL
        @slice = backend.prepare(<<-CQL)
          SELECT #{config.key_column}, #{config.value_column}, #{config.updated_column}, #{config.expired_column}
          FROM #{config.table}
          WHERE #{config.key_column} IN ?
        CQL
        @merge_delete = backend.prepare(<<-CQL)
          DELETE FROM #{config.table}
          USING TIMESTAMP ?
          WHERE #{config.key_column} IN ?
        CQL
      end
timestamp() click to toggle source
# File lib/moneta/adapters/cassandra.rb, line 254
def timestamp
  (Time.now.to_r * 1_000_000).to_i
end