class Siberite::Client::Transactional

Constants

DEFAULT_RETRIES

Number of times to retry a job before giving up

ERROR_PROCESSING_RATE

Pct. of the time during ‘normal’ processing we check the error queue first

Attributes

current_queue[R]

Public Class Methods

new(client, max_retries = nil, error_rate = nil) click to toggle source

Parameters

client<Siberite::Client>

Client

max_retries<Integer>

Number of times to retry a job before giving up. Defaults to DEFAULT_RETRIES

error_rate<Float>

Pct. of the time during ‘normal’ processing we check the error queue first. Defaults to ERROR_PROCESSING_RATE

per_server<Integer>

Number of gets to execute against a single server, before changing servers. Defaults to MAX_PER_SERVER

Calls superclass method Siberite::Client::Proxy::new
# File lib/siberite/client/transactional.rb, line 37
def initialize(client, max_retries = nil, error_rate = nil)
  @max_retries = max_retries || DEFAULT_RETRIES
  @error_rate  = error_rate || ERROR_PROCESSING_RATE
  @counter     = 0 # Command counter
  super(client)
end

Public Instance Methods

current_try() click to toggle source
# File lib/siberite/client/transactional.rb, line 77
def current_try
  @job.retries + 1
end
get(key, opts = {}) click to toggle source

Returns job from the key queue 1 - ERROR_PROCESSING_RATE pct. of the time. Every so often, checks the error queue for jobs and returns a retryable job.

Returns

Job, possibly retryable, or nil

Raises

MultipleQueueException

# File lib/siberite/client/transactional.rb, line 56
def get(key, opts = {})
  raise MultipleQueueException if current_queue && key != current_queue

  close_last_transaction

  if read_from_error_queue?
    queue = key + "_errors"
    job = client.get_from_last(queue, opts.merge(:open => true))
  else
    queue = key
    job = client.get(queue, opts.merge(:open => true))
  end

  if job
    @job = job.is_a?(RetryableJob) ? job : RetryableJob.new(0, job)
    @last_read_queue = queue
    @current_queue = key
    @job.job
  end
end
retry(item = nil) click to toggle source

Enqueues the current job on the error queue for later retry. If the job has been retried DEFAULT_RETRIES times, gives up entirely.

Parameters

item (optional)

if specified, the job set to the error queue with the given payload instead of what was originally fetched.

Returns

Boolean

true if the job is enqueued in the retry queue, false otherwise

Raises

NoOpenTransaction

# File lib/siberite/client/transactional.rb, line 103
def retry(item = nil)
  raise NoOpenTransaction unless @last_read_queue

  job = item ? RetryableJob.new(@job.retries, item) : @job.dup

  job.retries += 1

  if job.retries < @max_retries
    client.set(current_queue + "_errors", job)
  else
    raise RetriesExceeded.new(job)
  end

ensure
  close_last_transaction
end

Private Instance Methods

read_from_error_queue?() click to toggle source
# File lib/siberite/client/transactional.rb, line 122
def read_from_error_queue?
  rand < @error_rate
end