class Riak::Client
A client connection to Riak
.
Constants
- HOST_REGEX
Regexp for validating hostnames, lifted from uri.rb in Ruby 1.8.6
- MAX_CLIENT_ID
When using integer client IDs, the exclusive upper-bound of valid values.
- NETWORK_ERRORS
Network errors.
- Pool
- VALID_OPTIONS
Valid constructor options.
Attributes
@return [Hash] The authentication information this client will use.
@return [String] The internal client ID used by Riak
to route responses
@return [Numeric] The connect timeout, in seconds
@return [Boolean] Convert timestamps from Riak
TS to Time objects
@return [Integer] The maximum number of retries in case of NETWORK_ERRORS
@return [Integer] The number of threads for multiget requests
@return [Integer] The number of threads for multiget requests
@return [Array] The set of Nodes this client can communicate with.
@return [Symbol] The Protocol Buffers backend/client to use
@return [Client::Pool] A pool of protobuffs connections
@return [Numeric] The read timeout, in seconds
@return [Numeric] The write timeout, in seconds
Public Class Methods
Creates a client connection to Riak
@param [Hash] options configuration options for the client @option options [Array] :nodes A list of nodes this client connects to.
Each element of the list is a hash which is passed to Node.new, e.g. `{host: '127.0.0.1', pb_port: 1234, ...}`. If no nodes are given, a single node is constructed from the remaining options given to Client.new.
@option options [String] :host ('127.0.0.1') The host or IP address for the Riak
endpoint @option options [Fixnum] :pb_port (8087) The port of the Riak
Protocol Buffers endpoint @option options [Fixnum, String] :client_id (rand(MAX_CLIENT_ID
)) The internal client ID used by Riak
to route responses @option options [String, Symbol] :protobuffs_backend (:Beefcake) which Protocol Buffers backend to use @option options [Fixnum] :max_retries (2) The maximum number of retries in case of NETWORK_ERRORS
@option options [Numeric] :connect_timeout (nil) The connect timeout, in seconds @option options [Numeric] :read_timeout (nil) The read timeout, in seconds @option options [Numeric] :write_timeout (nil) The write timeout, in seconds @raise [ArgumentError] raised if any invalid options are given
# File lib/riak/client.rb, line 112 def initialize(options = {}) if options.include? :port warn(t('deprecated.port', :backtrace => caller[0..2].join("\n "))) end unless (evil = options.keys - VALID_OPTIONS).empty? raise ArgumentError, "#{evil.inspect} are not valid options for Client.new" end @nodes = build_nodes(options) @protobuffs_pool = Pool.new( method(:new_protobuffs_backend), lambda { |b| b.teardown } ) self.protobuffs_backend = options[:protobuffs_backend] || :Beefcake self.client_id = options[:client_id] if options[:client_id] self.multi_threads = options[:multi_threads] || options[:multiget_threads] @authentication = options[:authentication] && options[:authentication].symbolize_keys self.max_retries = options[:max_retries] || 2 @connect_timeout = options[:connect_timeout] @read_timeout = options[:read_timeout] @write_timeout = options[:write_timeout] @convert_timestamp = options[:convert_timestamp] || false end
Public Instance Methods
Retrieves a bucket from Riak
. @param [String] name the bucket to retrieve @param [Hash] options options for retrieving the bucket @option options [Boolean] :props (false) whether to retreive the bucket properties @return [Bucket] the requested bucket
# File lib/riak/client.rb, line 150 def bucket(name, options = {}) raise ArgumentError, t('zero_length_bucket') if name == '' unless (options.keys - [:props]).empty? raise ArgumentError, "invalid options" end @bucket_cache ||= {} (@bucket_cache[name] ||= Bucket.new(self, name)).tap do |b| b.props if options[:props] end end
# File lib/riak/client.rb, line 162 def bucket_type(name) BucketType.new self, name end
Lists buckets which have keys stored in them. @note This is an expensive operation and should be used only
in development.
@return [Array<Bucket>] a list of buckets
# File lib/riak/client.rb, line 170 def buckets(options = {}, &block) warn(t('list_buckets', :backtrace => caller.join("\n "))) unless Riak.disable_list_keys_warnings return ListBuckets.new self, options, block if block_given? backend do |b| b.list_buckets(options).map {|name| Bucket.new(self, name) } end end
Choose a node from a set.
# File lib/riak/client.rb, line 182 def choose_node(nodes = self.nodes) # Prefer nodes which have gone a reasonable time without errors. s = nodes.select do |node| node.error_rate.value < 0.1 end if s.empty? # Fall back to minimally broken node. nodes.min_by do |node| node.error_rate.value end else s[rand(s.size)] end end
Clears the properties on a bucket. See Bucket#clear_props
# File lib/riak/client.rb, line 422 def clear_bucket_props(bucket, options = { }) backend do |b| b.reset_bucket_props(bucket, options) end end
Set
the client ID for this client. Must be a string or Fixnum value 0 =< value < MAX_CLIENT_ID
. @param [String, Fixnum] value The internal client ID used by Riak
to route responses @raise [ArgumentError] when an invalid client ID is given @return [String] the assigned client ID
# File lib/riak/client.rb, line 224 def client_id=(value) value = case value when 0...MAX_CLIENT_ID, String value else raise ArgumentError, t("invalid_client_id", :max_id => MAX_CLIENT_ID) end # Change all existing backend client IDs. @protobuffs_pool.each do |pb| pb.set_client_id value if pb.respond_to?(:set_client_id) end @client_id = value end
# File lib/riak/client/yokozuna.rb, line 3 def create_search_index(name, schema = nil, n_val = nil, timeout = nil) raise ArgumentError, t("zero_length_index") if name.nil? || name.empty? backend do |b| b.create_search_index(name, schema, n_val, timeout) end true end
# File lib/riak/client/yokozuna.rb, line 36 def create_search_schema(name, content) raise ArgumentError, t("zero_length_schema") if name.nil? || name.empty? raise ArgumentError, t("zero_length_content") if content.nil? || content.empty? backend do |b| b.create_search_schema(name, content) end true end
Delete an object. See Bucket#delete
# File lib/riak/client.rb, line 250 def delete_object(bucket, key, options = {}) backend do |b| b.delete_object(bucket, key, options) end end
# File lib/riak/client/yokozuna.rb, line 28 def delete_search_index(name) raise ArgumentError, t("zero_length_index") if name.nil? || name.empty? backend do |b| b.delete_search_index(name) end true end
Bucket
properties. See Bucket#props
# File lib/riak/client.rb, line 257 def get_bucket_props(bucket, options = { }) backend do |b| b.get_bucket_props bucket, options end end
Queries a secondary index on a bucket. See Bucket#get_index
# File lib/riak/client.rb, line 264 def get_index(bucket, index, query, options = {}) backend do |b| b.get_index bucket, index, query, options end end
Get multiple objects in parallel.
# File lib/riak/client.rb, line 284 def get_many(pairs) Multiget.perform self, pairs end
Get an object. See Bucket#get
# File lib/riak/client.rb, line 289 def get_object(bucket, key, options = {}) raise ArgumentError, t('zero_length_key') if key == '' raise ArgumentError, t('string_type', :string => key) unless key.is_a? String backend do |b| b.fetch_object(bucket, key, options) end end
Retrieves a preflist for the given bucket, key, and type; useful for figuring out where in the cluster an object is stored. @param [Bucket, String] bucket the Bucket
or name of the bucket @param [String] key the key @param [BucketType, String] type the bucket type or name of the bucket
type
@return [Array<PreflistItem>] an array of preflist entries
# File lib/riak/client.rb, line 277 def get_preflist(bucket, key, type = nil, options = { }) backend do |b| b.get_preflist bucket, key, type, options end end
# File lib/riak/client/yokozuna.rb, line 11 def get_search_index(name) raise ArgumentError, t("zero_length_index") if name.nil? || name.empty? resp = [] backend do |b| resp = b.get_search_index(name) end resp.index && Array === resp.index ? resp.index.first : resp end
# File lib/riak/client/yokozuna.rb, line 45 def get_search_schema(name) raise ArgumentError, t("zero_length_schema") if name.nil? || name.empty? backend do |b| return b.get_search_schema(name) end end
@return [String] A representation suitable for IRB and debugging output.
# File lib/riak/client.rb, line 298 def inspect "#<Riak::Client #{nodes.inspect}>" end
Retrieves a list of keys in the given bucket. See Bucket#keys
# File lib/riak/client.rb, line 303 def list_keys(bucket, options = {}, &block) if block_given? backend do |b| b.list_keys bucket, options, &block end else backend do |b| b.list_keys bucket, options end end end
# File lib/riak/client/yokozuna.rb, line 20 def list_search_indexes() resp = [] backend do |b| resp = b.get_search_index(nil) end resp.index ? resp.index : resp end
Executes a mapreduce request. See MapReduce#run
# File lib/riak/client.rb, line 316 def mapred(mr, &block) backend do |b| b.mapred(mr, &block) end end
Set
the number of threads to use for multiget operations. If set to nil, defaults to twice the number of nodes. @param [Integer] count The number of threads to use. @raise [ArgumentError] when a non-nil, non-positive-Integer count is given
# File lib/riak/client.rb, line 202 def multi_threads=(count) if count.nil? @multi_threads = nodes.length * 2 return end if count.is_a?(Integer) && count > 0 @multi_threads = count return end raise ArgumentError, t("invalid_multiget_thread_count") # TODO: rename to invalid_multi_thread_count end
Creates a new protocol buffers backend. @return [ProtobuffsBackend] the Protocol Buffers backend for
a given node.
# File lib/riak/client.rb, line 325 def new_protobuffs_backend klass = self.class.const_get("#{@protobuffs_backend}ProtobuffsBackend") unless klass.configured? raise BackendCreationError.new @protobuffs_backend end node = choose_node( @nodes.select do |n| n.protobuffs? end ) klass.new(self, node) end
@return [Node] An arbitrary Node
.
# File lib/riak/client.rb, line 340 def node nodes[rand nodes.size] end
Yields a protocol buffers backend.
# File lib/riak/client.rb, line 353 def protobuffs(&block) recover_from @protobuffs_pool, &block end
Sets the desired Protocol Buffers backend
# File lib/riak/client.rb, line 359 def protobuffs_backend=(value) # Shutdown any connections using the old backend @protobuffs_backend = value @protobuffs_pool.clear @protobuffs_backend end
Takes a pool. Acquires a backend from the pool and yields it with node-specific error recovery.
# File lib/riak/client.rb, line 368 def recover_from(pool) skip_nodes = [] take_opts = {} tries = 1 + max_retries begin # Only select nodes which we haven't used before. unless skip_nodes.empty? take_opts[:filter] = lambda do |backend| not skip_nodes.include? backend.node end end # Acquire a backend pool.take(take_opts) do |backend| begin yield backend rescue *NETWORK_ERRORS => e Riak.logger.warn("Riak client error: #{e.inspect} for #{backend.inspect}") # Network error. tries -= 1 # Notify the node that a request against it failed. backend.node.error_rate << 1 # Skip this node next time. skip_nodes << backend.node # And delete this connection. raise Pool::BadResource, e end end rescue Pool::BadResource => e retry if tries > 0 raise e.message end end
Reloads the object from Riak
.
# File lib/riak/client.rb, line 408 def reload_object(object, options = {}) backend do |b| b.reload_object(object, options) end end
(Riak
Search
) Performs a search via the Solr interface. @overload search(index, query, options={})
@param [String] index the index to query on @param [String] query a Lucene query string
@overload search(query, options={})
Queries the default index @param [String] query a Lucene query string
@param [Hash] options extra options for the Solr query @option options [String] :df the default field to search in @option options [String] :'q.op' the default operator between terms (“or”, “and”) @option options [String] :wt (“json”) the response type - “json” and “xml” are valid @option options [String] :sort ('none') the field and direction to sort, e.g. “name asc” @option options [Fixnum] :start (0) the offset into the query to start from, e.g. for pagination @option options [Fixnum] :rows (10) the number of results to return @return [Hash] the query result, containing the 'responseHeaders' and 'response' keys
# File lib/riak/client/search.rb, line 18 def search(*args) options = args.extract_options! index, query = args[-2], args[-1] # Allows nil index, while keeping it as firstargument backend do |b| b.search(index, query, options) end end
Is security enabled? @return [Boolean] whether or not a secure connection is being used
# File lib/riak/client.rb, line 141 def security? !!authentication end
Sets the properties on a bucket. See Bucket#props=
# File lib/riak/client.rb, line 415 def set_bucket_props(bucket, properties, type = nil) backend do |b| b.set_bucket_props(bucket, properties, type) end end
Exposes a {Stamp} object for use in generating unique identifiers. @return [Stamp] an ID generator @see Stamp#next
# File lib/riak/client.rb, line 432 def stamp @stamp ||= Riak::Stamp.new(self) end
Stores an object in Riak
.
# File lib/riak/client.rb, line 438 def store_object(object, options = {}) params = {:returnbody => true}.merge(options) backend do |b| b.store_object(object, params) end end
Private Instance Methods
# File lib/riak/client.rb, line 462 def build_nodes(options) if options.key?(:nodes) and !options[:nodes].empty? options[:nodes].map do |n| if !n.key?(:pb_port) and options.key?(:pb_port) n[:pb_port] = options[:pb_port] end Client::Node.new self, n end else [Client::Node.new(self, options)] end end
# File lib/riak/client.rb, line 446 def make_client_id rand(MAX_CLIENT_ID) end
# File lib/riak/client.rb, line 456 def ssl_disable @nodes.each do |n| n.ssl_disable end end
# File lib/riak/client.rb, line 450 def ssl_enable @nodes.each do |n| n.ssl_enable end end