class Siberite::Client

Constants

DEFAULT_OPTIONS
RECOVERABLE_ERRORS

Exceptions which are connection failures we retry after

SIBERITE_OPTIONS

Attributes

current_queue[R]
current_server[R]
options[RW]
servers[RW]
siberite_options[R]

Public Class Methods

new(*servers) click to toggle source
# File lib/siberite/client.rb, line 51
def initialize(*servers)
  opts = servers.last.is_a?(Hash) ? servers.pop : {}
  opts = DEFAULT_OPTIONS.merge(opts)

  @siberite_options = extract_siberite_options!(opts)
  @default_get_timeout = siberite_options[:get_timeout_ms]
  @gets_per_server = siberite_options[:gets_per_server]
  @exception_retry_limit = siberite_options[:exception_retry_limit]
  @counter = 0
  @shuffle = true

  # we handle our own retries so that we can apply different
  # policies to sets and gets, so set memcached limit to 0
  opts[:exception_retry_limit] = 0
  opts[:distribution] = :random # force random distribution

  self.servers  = Array(servers).flatten.compact
  self.options  = opts

  @server_count = self.servers.size # Minor optimization.
  @read_client  = Memcached.new(self.servers[rand(@server_count)], opts)
  @write_client = Memcached.new(self.servers[rand(@server_count)], opts)
end

Public Instance Methods

delete(key, expiry=0) click to toggle source
# File lib/siberite/client.rb, line 75
def delete(key, expiry=0)
  with_retries { @write_client.delete key }
rescue Memcached::NotFound, Memcached::ServerEnd
end
flush(queue) click to toggle source
# File lib/siberite/client.rb, line 137
def flush(queue)
  count = 0
  while sizeof(queue) > 0
    count += 1 while get queue, :raw => true
  end
  count
end
get(key, opts = {}) click to toggle source

Parameters

key<String>

Queue name

opts<Boolean,Hash>

True/false toggles Marshalling. A Hash allows collision-avoiding options support.

Options (opts)

:open<Boolean>

Begins a transactional read.

:close<Boolean>

Ends a transactional read.

:abort<Boolean>

Cancels an existing transactional read

:peek<Boolean>

Return the head of the queue, without removal

:timeout<Integer>

Milliseconds to block for a new item

:raw<Boolean>

Toggles Marshalling. Equivalent to the “old style” second argument.

# File lib/siberite/client.rb, line 113
def get(key, opts = {})
  raw = opts[:raw] || false
  commands = extract_queue_commands(opts)

  val =
    begin
      shuffle_if_necessary! key
      @read_client.get key + commands, !raw
    rescue *RECOVERABLE_ERRORS
      # we can't tell the difference between a server being down
      # and an empty queue, so just return nil. our sticky server
      # logic should eliminate piling on down servers
      nil
    end

  # nil result without :close and :abort, force next get to jump from
  # current server
  if !val && @shuffle && !opts[:close] && !opts[:abort]
    @counter = @gets_per_server
  end

  val
end
get_abort(queue) click to toggle source
# File lib/siberite/client.rb, line 161
def get_abort(queue)
  get_from_last queue, abort: true
end
get_close(queue) click to toggle source
# File lib/siberite/client.rb, line 153
def get_close(queue)
  get_from_last queue, close: true
end
get_close_open(queue) click to toggle source
# File lib/siberite/client.rb, line 157
def get_close_open(queue)
  get_from_last queue, close: true, open: true
end
get_from_last(*args) click to toggle source

This provides the necessary semantic to support transactionality in the Transactional client. It temporarily disables server shuffling to allow the client to close any open transactions on the current server before jumping.

# File lib/siberite/client.rb, line 92
def get_from_last(*args)
  @shuffle = false
  get *args
ensure
  @shuffle = true
end
get_open(queue) click to toggle source
# File lib/siberite/client.rb, line 149
def get_open(queue)
  get queue, open: true
end
peek(queue) click to toggle source
# File lib/siberite/client.rb, line 145
def peek(queue)
  get queue, peek: true
end
set(key, value, ttl=0, raw=false) click to toggle source
# File lib/siberite/client.rb, line 80
def set(key, value, ttl=0, raw=false)
  with_retries { @write_client.set key, value, ttl, !raw }
  true
rescue Memcached::NotStored
  false
end

Private Instance Methods

extract_queue_commands(opts) click to toggle source
# File lib/siberite/client.rb, line 191
def extract_queue_commands(opts)
  commands = [:close, :open, :abort, :peek].select do |key|
    opts[key]
  end

  if timeout = (opts[:timeout] || @default_get_timeout)
    commands << "t=#{timeout}"
  end

  commands.map { |c| "/#{c}" }.join('')
end
extract_siberite_options!(opts) click to toggle source
# File lib/siberite/client.rb, line 167
def extract_siberite_options!(opts)
  siberite_opts, memcache_opts = opts.inject([{}, {}]) do |(siberite, memcache), (key, opt)|
    (SIBERITE_OPTIONS.include?(key) ? siberite : memcache)[key] = opt
    [siberite, memcache]
  end
  opts.replace(memcache_opts)
  siberite_opts
end
shuffle_if_necessary!(key) click to toggle source
# File lib/siberite/client.rb, line 176
def shuffle_if_necessary!(key)
  return unless @server_count > 1
  # Don't reset servers on the first request:
  # i.e. @counter == 0 && @current_queue == nil
  if @shuffle &&
      (@counter > 0 && key != @current_queue) ||
      @counter >= @gets_per_server
    @counter = 0
    @current_queue = key
    @read_client.reset(servers[rand(@server_count)])
  else
    @counter +=1
  end
end