class Riak::Client

A client connection to Riak.

Constants

HOST_REGEX

Regexp for validating hostnames, lifted from uri.rb in Ruby 1.8.6

MAX_CLIENT_ID

When using integer client IDs, the exclusive upper-bound of valid values.

NETWORK_ERRORS

Network errors.

Pool
VALID_OPTIONS

Valid constructor options.

Attributes

authentication[R]

@return [Hash] The authentication information this client will use.

client_id[R]

@return [String] The internal client ID used by Riak to route responses

connect_timeout[R]

@return [Numeric] The connect timeout, in seconds

convert_timestamp[R]

@return [Boolean] Convert timestamps from Riak TS to Time objects

max_retries[RW]

@return [Integer] The maximum number of retries in case of NETWORK_ERRORS

multi_threads[R]

@return [Integer] The number of threads for multiget requests

multiget_threads[R]

@return [Integer] The number of threads for multiget requests

nodes[RW]

@return [Array] The set of Nodes this client can communicate with.

protobuffs_backend[RW]

@return [Symbol] The Protocol Buffers backend/client to use

protobuffs_pool[R]

@return [Client::Pool] A pool of protobuffs connections

read_timeout[R]

@return [Numeric] The read timeout, in seconds

write_timeout[R]

@return [Numeric] The write timeout, in seconds

Public Class Methods

new(options = {}) click to toggle source

Creates a client connection to Riak @param [Hash] options configuration options for the client @option options [Array] :nodes A list of nodes this client connects to.

Each element of the list is a hash which is passed to Node.new, e.g.
`{host: '127.0.0.1', pb_port: 1234, ...}`.
If no nodes are given, a single node is constructed from the remaining
options given to Client.new.

@option options [String] :host ('127.0.0.1') The host or IP address for the Riak endpoint @option options [Fixnum] :pb_port (8087) The port of the Riak Protocol Buffers endpoint @option options [Fixnum, String] :client_id (rand(MAX_CLIENT_ID)) The internal client ID used by Riak to route responses @option options [String, Symbol] :protobuffs_backend (:Beefcake) which Protocol Buffers backend to use @option options [Fixnum] :max_retries (2) The maximum number of retries in case of NETWORK_ERRORS @option options [Numeric] :connect_timeout (nil) The connect timeout, in seconds @option options [Numeric] :read_timeout (nil) The read timeout, in seconds @option options [Numeric] :write_timeout (nil) The write timeout, in seconds @raise [ArgumentError] raised if any invalid options are given

# File lib/riak/client.rb, line 112
def initialize(options = {})
  if options.include? :port
    warn(t('deprecated.port', :backtrace => caller[0..2].join("\n    ")))
  end

  unless (evil = options.keys - VALID_OPTIONS).empty?
    raise ArgumentError, "#{evil.inspect} are not valid options for Client.new"
  end

  @nodes = build_nodes(options)

  @protobuffs_pool = Pool.new(
                              method(:new_protobuffs_backend),
                              lambda { |b| b.teardown }
                              )

  self.protobuffs_backend = options[:protobuffs_backend] || :Beefcake
  self.client_id          = options[:client_id]          if options[:client_id]
  self.multi_threads      = options[:multi_threads] || options[:multiget_threads]
  @authentication         = options[:authentication] && options[:authentication].symbolize_keys
  self.max_retries        = options[:max_retries]        || 2
  @connect_timeout        = options[:connect_timeout]
  @read_timeout           = options[:read_timeout]
  @write_timeout          = options[:write_timeout]
  @convert_timestamp      = options[:convert_timestamp]  || false
end

Public Instance Methods

[](name, options = {})
Alias for: bucket
backend(&block)
Alias for: protobuffs
bucket(name, options = {}) click to toggle source

Retrieves a bucket from Riak. @param [String] name the bucket to retrieve @param [Hash] options options for retrieving the bucket @option options [Boolean] :props (false) whether to retreive the bucket properties @return [Bucket] the requested bucket

# File lib/riak/client.rb, line 150
def bucket(name, options = {})
  raise ArgumentError, t('zero_length_bucket') if name == ''
  unless (options.keys - [:props]).empty?
    raise ArgumentError, "invalid options"
  end
  @bucket_cache ||= {}
  (@bucket_cache[name] ||= Bucket.new(self, name)).tap do |b|
    b.props if options[:props]
  end
end
Also aliased as: []
bucket_type(name) click to toggle source
# File lib/riak/client.rb, line 162
def bucket_type(name)
  BucketType.new self, name
end
buckets(options = {}, &block) click to toggle source

Lists buckets which have keys stored in them. @note This is an expensive operation and should be used only

in development.

@return [Array<Bucket>] a list of buckets

# File lib/riak/client.rb, line 170
def buckets(options = {}, &block)
  warn(t('list_buckets', :backtrace => caller.join("\n    "))) unless Riak.disable_list_keys_warnings

  return ListBuckets.new self, options, block if block_given?

  backend do |b|
    b.list_buckets(options).map {|name| Bucket.new(self, name) }
  end
end
Also aliased as: list_buckets
choose_node(nodes = self.nodes) click to toggle source

Choose a node from a set.

# File lib/riak/client.rb, line 182
def choose_node(nodes = self.nodes)
  # Prefer nodes which have gone a reasonable time without errors.
  s = nodes.select do |node|
    node.error_rate.value < 0.1
  end

  if s.empty?
    # Fall back to minimally broken node.
    nodes.min_by do |node|
      node.error_rate.value
    end
  else
    s[rand(s.size)]
  end
end
clear_bucket_props(bucket, options = { }) click to toggle source

Clears the properties on a bucket. See Bucket#clear_props

# File lib/riak/client.rb, line 422
def clear_bucket_props(bucket, options = {  })
  backend do |b|
    b.reset_bucket_props(bucket, options)
  end
end
client_id=(value) click to toggle source

Set the client ID for this client. Must be a string or Fixnum value 0 =< value < MAX_CLIENT_ID. @param [String, Fixnum] value The internal client ID used by Riak to route responses @raise [ArgumentError] when an invalid client ID is given @return [String] the assigned client ID

# File lib/riak/client.rb, line 224
def client_id=(value)
  value = case value
          when 0...MAX_CLIENT_ID, String
            value
          else
            raise ArgumentError, t("invalid_client_id", :max_id => MAX_CLIENT_ID)
          end

  # Change all existing backend client IDs.
  @protobuffs_pool.each do |pb|
    pb.set_client_id value if pb.respond_to?(:set_client_id)
  end
  @client_id = value
end
create_search_index(name, schema = nil, n_val = nil, timeout = nil) click to toggle source
# File lib/riak/client/yokozuna.rb, line 3
def create_search_index(name, schema = nil, n_val = nil, timeout = nil)
  raise ArgumentError, t("zero_length_index") if name.nil? || name.empty?
  backend do |b|
    b.create_search_index(name, schema, n_val, timeout)
  end
  true
end
create_search_schema(name, content) click to toggle source
# File lib/riak/client/yokozuna.rb, line 36
def create_search_schema(name, content)
  raise ArgumentError, t("zero_length_schema") if name.nil? || name.empty?
  raise ArgumentError, t("zero_length_content") if content.nil? || content.empty?
  backend do |b|
    b.create_search_schema(name, content)
  end
  true
end
delete_object(bucket, key, options = {}) click to toggle source

Delete an object. See Bucket#delete

# File lib/riak/client.rb, line 250
def delete_object(bucket, key, options = {})
  backend do |b|
    b.delete_object(bucket, key, options)
  end
end
delete_search_index(name) click to toggle source
# File lib/riak/client/yokozuna.rb, line 28
def delete_search_index(name)
  raise ArgumentError, t("zero_length_index") if name.nil? || name.empty?
  backend do |b|
    b.delete_search_index(name)
  end
  true
end
get_bucket_props(bucket, options = { }) click to toggle source

Bucket properties. See Bucket#props

# File lib/riak/client.rb, line 257
def get_bucket_props(bucket, options = {  })
  backend do |b|
    b.get_bucket_props bucket, options
  end
end
get_index(bucket, index, query, options = {}) click to toggle source

Queries a secondary index on a bucket. See Bucket#get_index

# File lib/riak/client.rb, line 264
def get_index(bucket, index, query, options = {})
  backend do |b|
    b.get_index bucket, index, query, options
  end
end
get_many(pairs) click to toggle source

Get multiple objects in parallel.

# File lib/riak/client.rb, line 284
def get_many(pairs)
  Multiget.perform self, pairs
end
get_object(bucket, key, options = {}) click to toggle source

Get an object. See Bucket#get

# File lib/riak/client.rb, line 289
def get_object(bucket, key, options = {})
  raise ArgumentError, t('zero_length_key') if key == ''
  raise ArgumentError, t('string_type', :string => key) unless key.is_a? String
  backend do |b|
    b.fetch_object(bucket, key, options)
  end
end
get_preflist(bucket, key, type = nil, options = { }) click to toggle source

Retrieves a preflist for the given bucket, key, and type; useful for figuring out where in the cluster an object is stored. @param [Bucket, String] bucket the Bucket or name of the bucket @param [String] key the key @param [BucketType, String] type the bucket type or name of the bucket

type

@return [Array<PreflistItem>] an array of preflist entries

# File lib/riak/client.rb, line 277
def get_preflist(bucket, key, type = nil, options = {  })
  backend do |b|
    b.get_preflist bucket, key, type, options
  end
end
get_search_index(name) click to toggle source
# File lib/riak/client/yokozuna.rb, line 11
def get_search_index(name)
  raise ArgumentError, t("zero_length_index") if name.nil? || name.empty?
  resp = []
  backend do |b|
    resp = b.get_search_index(name)
  end
  resp.index && Array === resp.index ? resp.index.first : resp
end
get_search_schema(name) click to toggle source
# File lib/riak/client/yokozuna.rb, line 45
def get_search_schema(name)
  raise ArgumentError, t("zero_length_schema") if name.nil? || name.empty?
  backend do |b|
    return b.get_search_schema(name)
  end
end
inspect() click to toggle source

@return [String] A representation suitable for IRB and debugging output.

# File lib/riak/client.rb, line 298
def inspect
  "#<Riak::Client #{nodes.inspect}>"
end
list_buckets(options = {}, &block)
Alias for: buckets
list_keys(bucket, options = {}, &block) click to toggle source

Retrieves a list of keys in the given bucket. See Bucket#keys

# File lib/riak/client.rb, line 303
def list_keys(bucket, options = {}, &block)
  if block_given?
    backend do |b|
      b.list_keys bucket, options, &block
    end
  else
    backend do |b|
      b.list_keys bucket, options
    end
  end
end
list_search_indexes() click to toggle source
# File lib/riak/client/yokozuna.rb, line 20
def list_search_indexes()
  resp = []
  backend do |b|
    resp = b.get_search_index(nil)
  end
  resp.index ? resp.index : resp
end
mapred(mr, &block) click to toggle source

Executes a mapreduce request. See MapReduce#run

# File lib/riak/client.rb, line 316
def mapred(mr, &block)
  backend do |b|
    b.mapred(mr, &block)
  end
end
multi_threads=(count) click to toggle source

Set the number of threads to use for multiget operations. If set to nil, defaults to twice the number of nodes. @param [Integer] count The number of threads to use. @raise [ArgumentError] when a non-nil, non-positive-Integer count is given

# File lib/riak/client.rb, line 202
def multi_threads=(count)
  if count.nil?
    @multi_threads = nodes.length * 2
    return
  end

  if count.is_a?(Integer) && count > 0
    @multi_threads = count
    return
  end

  raise ArgumentError, t("invalid_multiget_thread_count") # TODO: rename to invalid_multi_thread_count
end
Also aliased as: multiget_threads=
multiget_threads=(count)

@deprecated use multi_threads=

Alias for: multi_threads=
new_protobuffs_backend() click to toggle source

Creates a new protocol buffers backend. @return [ProtobuffsBackend] the Protocol Buffers backend for

a given node.
# File lib/riak/client.rb, line 325
def new_protobuffs_backend
  klass = self.class.const_get("#{@protobuffs_backend}ProtobuffsBackend")
  unless klass.configured?
    raise BackendCreationError.new @protobuffs_backend
  end
  node = choose_node(
    @nodes.select do |n|
      n.protobuffs?
    end
  )

  klass.new(self, node)
end
node() click to toggle source

@return [Node] An arbitrary Node.

# File lib/riak/client.rb, line 340
def node
  nodes[rand nodes.size]
end
ping() click to toggle source

Pings the Riak cluster to check for liveness. @return [true,false] whether the Riak cluster is alive and reachable

# File lib/riak/client.rb, line 346
def ping
  backend do |b|
    b.ping
  end
end
protobuffs(&block) click to toggle source

Yields a protocol buffers backend.

# File lib/riak/client.rb, line 353
def protobuffs(&block)
  recover_from @protobuffs_pool, &block
end
Also aliased as: backend
protobuffs_backend=(value) click to toggle source

Sets the desired Protocol Buffers backend

# File lib/riak/client.rb, line 359
def protobuffs_backend=(value)
  # Shutdown any connections using the old backend
  @protobuffs_backend = value
  @protobuffs_pool.clear
  @protobuffs_backend
end
recover_from(pool) { |backend| ... } click to toggle source

Takes a pool. Acquires a backend from the pool and yields it with node-specific error recovery.

# File lib/riak/client.rb, line 368
def recover_from(pool)
  skip_nodes = []
  take_opts = {}
  tries = 1 + max_retries

  begin
    # Only select nodes which we haven't used before.
    unless skip_nodes.empty?
      take_opts[:filter] = lambda do |backend|
        not skip_nodes.include? backend.node
      end
    end

    # Acquire a backend
    pool.take(take_opts) do |backend|
      begin
        yield backend
      rescue *NETWORK_ERRORS => e
        Riak.logger.warn("Riak client error: #{e.inspect} for #{backend.inspect}")

        # Network error.
        tries -= 1

        # Notify the node that a request against it failed.
        backend.node.error_rate << 1

        # Skip this node next time.
        skip_nodes << backend.node

        # And delete this connection.
        raise Pool::BadResource, e
      end
    end
  rescue Pool::BadResource => e
    retry if tries > 0
    raise e.message
  end
end
reload_object(object, options = {}) click to toggle source

Reloads the object from Riak.

# File lib/riak/client.rb, line 408
def reload_object(object, options = {})
  backend do |b|
    b.reload_object(object, options)
  end
end
security?() click to toggle source

Is security enabled? @return [Boolean] whether or not a secure connection is being used

# File lib/riak/client.rb, line 141
def security?
  !!authentication
end
select(*args)
Alias for: search
set_bucket_props(bucket, properties, type = nil) click to toggle source

Sets the properties on a bucket. See Bucket#props=

# File lib/riak/client.rb, line 415
def set_bucket_props(bucket, properties, type = nil)
  backend do |b|
    b.set_bucket_props(bucket, properties, type)
  end
end
stamp() click to toggle source

Exposes a {Stamp} object for use in generating unique identifiers. @return [Stamp] an ID generator @see Stamp#next

# File lib/riak/client.rb, line 432
def stamp
  @stamp ||= Riak::Stamp.new(self)
end
store_object(object, options = {}) click to toggle source

Stores an object in Riak.

# File lib/riak/client.rb, line 438
def store_object(object, options = {})
  params = {:returnbody => true}.merge(options)
  backend do |b|
    b.store_object(object, params)
  end
end

Private Instance Methods

build_nodes(options) click to toggle source
# File lib/riak/client.rb, line 462
def build_nodes(options)
  if options.key?(:nodes) and !options[:nodes].empty?
    options[:nodes].map do |n|
      if !n.key?(:pb_port) and options.key?(:pb_port)
        n[:pb_port] = options[:pb_port]
      end
      Client::Node.new self, n
    end
  else
    [Client::Node.new(self, options)]
  end
end
make_client_id() click to toggle source
# File lib/riak/client.rb, line 446
def make_client_id
  rand(MAX_CLIENT_ID)
end
ssl_disable() click to toggle source
# File lib/riak/client.rb, line 456
def ssl_disable
  @nodes.each do |n|
    n.ssl_disable
  end
end
ssl_enable() click to toggle source
# File lib/riak/client.rb, line 450
def ssl_enable
  @nodes.each do |n|
    n.ssl_enable
  end
end