class Siberite::Client
Constants
- DEFAULT_OPTIONS
- RECOVERABLE_ERRORS
Exceptions which are connection failures we retry after
- SIBERITE_OPTIONS
Attributes
current_queue[R]
current_server[R]
options[RW]
servers[RW]
siberite_options[R]
Public Class Methods
new(*servers)
click to toggle source
# File lib/siberite/client.rb, line 51 def initialize(*servers) opts = servers.last.is_a?(Hash) ? servers.pop : {} opts = DEFAULT_OPTIONS.merge(opts) @siberite_options = extract_siberite_options!(opts) @default_get_timeout = siberite_options[:get_timeout_ms] @gets_per_server = siberite_options[:gets_per_server] @exception_retry_limit = siberite_options[:exception_retry_limit] @counter = 0 @shuffle = true # we handle our own retries so that we can apply different # policies to sets and gets, so set memcached limit to 0 opts[:exception_retry_limit] = 0 opts[:distribution] = :random # force random distribution self.servers = Array(servers).flatten.compact self.options = opts @server_count = self.servers.size # Minor optimization. @read_client = Memcached.new(self.servers[rand(@server_count)], opts) @write_client = Memcached.new(self.servers[rand(@server_count)], opts) end
Public Instance Methods
delete(key, expiry=0)
click to toggle source
# File lib/siberite/client.rb, line 75 def delete(key, expiry=0) with_retries { @write_client.delete key } rescue Memcached::NotFound, Memcached::ServerEnd end
flush(queue)
click to toggle source
# File lib/siberite/client.rb, line 137 def flush(queue) count = 0 while sizeof(queue) > 0 count += 1 while get queue, :raw => true end count end
get(key, opts = {})
click to toggle source
Parameters¶ ↑
- key<String>
-
Queue name
- opts<Boolean,Hash>
-
True/false toggles Marshalling. A Hash allows collision-avoiding options support.
Options (opts)¶ ↑
- :open<Boolean>
-
Begins a transactional read.
- :close<Boolean>
-
Ends a transactional read.
- :abort<Boolean>
-
Cancels an existing transactional read
- :peek<Boolean>
-
Return the head of the queue, without removal
- :timeout<Integer>
-
Milliseconds to block for a new item
- :raw<Boolean>
-
Toggles Marshalling. Equivalent to the “old style” second argument.
# File lib/siberite/client.rb, line 113 def get(key, opts = {}) raw = opts[:raw] || false commands = extract_queue_commands(opts) val = begin shuffle_if_necessary! key @read_client.get key + commands, !raw rescue *RECOVERABLE_ERRORS # we can't tell the difference between a server being down # and an empty queue, so just return nil. our sticky server # logic should eliminate piling on down servers nil end # nil result without :close and :abort, force next get to jump from # current server if !val && @shuffle && !opts[:close] && !opts[:abort] @counter = @gets_per_server end val end
get_abort(queue)
click to toggle source
# File lib/siberite/client.rb, line 161 def get_abort(queue) get_from_last queue, abort: true end
get_close(queue)
click to toggle source
# File lib/siberite/client.rb, line 153 def get_close(queue) get_from_last queue, close: true end
get_close_open(queue)
click to toggle source
# File lib/siberite/client.rb, line 157 def get_close_open(queue) get_from_last queue, close: true, open: true end
get_from_last(*args)
click to toggle source
This provides the necessary semantic to support transactionality in the Transactional
client. It temporarily disables server shuffling to allow the client to close any open transactions on the current server before jumping.
# File lib/siberite/client.rb, line 92 def get_from_last(*args) @shuffle = false get *args ensure @shuffle = true end
get_open(queue)
click to toggle source
# File lib/siberite/client.rb, line 149 def get_open(queue) get queue, open: true end
peek(queue)
click to toggle source
# File lib/siberite/client.rb, line 145 def peek(queue) get queue, peek: true end
set(key, value, ttl=0, raw=false)
click to toggle source
# File lib/siberite/client.rb, line 80 def set(key, value, ttl=0, raw=false) with_retries { @write_client.set key, value, ttl, !raw } true rescue Memcached::NotStored false end
Private Instance Methods
extract_queue_commands(opts)
click to toggle source
# File lib/siberite/client.rb, line 191 def extract_queue_commands(opts) commands = [:close, :open, :abort, :peek].select do |key| opts[key] end if timeout = (opts[:timeout] || @default_get_timeout) commands << "t=#{timeout}" end commands.map { |c| "/#{c}" }.join('') end
extract_siberite_options!(opts)
click to toggle source
# File lib/siberite/client.rb, line 167 def extract_siberite_options!(opts) siberite_opts, memcache_opts = opts.inject([{}, {}]) do |(siberite, memcache), (key, opt)| (SIBERITE_OPTIONS.include?(key) ? siberite : memcache)[key] = opt [siberite, memcache] end opts.replace(memcache_opts) siberite_opts end
shuffle_if_necessary!(key)
click to toggle source
# File lib/siberite/client.rb, line 176 def shuffle_if_necessary!(key) return unless @server_count > 1 # Don't reset servers on the first request: # i.e. @counter == 0 && @current_queue == nil if @shuffle && (@counter > 0 && key != @current_queue) || @counter >= @gets_per_server @counter = 0 @current_queue = key @read_client.reset(servers[rand(@server_count)]) else @counter +=1 end end