class Rubykiq::Client

Constants

DEFAULT_OPTIONS

A hash of valid options and their default values

VALID_OPTIONS_KEYS

An array of valid keys in the options hash when configuring a ‘Rubykiq::Client`

Attributes

connection_pool[W]

allow the connection_pool to be set

Public Class Methods

new(options = {}) click to toggle source

Initialize a new Client object

@param options [Hash]

# File lib/rubykiq/client.rb, line 40
def initialize(options = {})
  reset_options
  options.each_pair do |key, value|
    send("#{key}=", value) if VALID_OPTIONS_KEYS.include?(key)
  end
end

Public Instance Methods

<<(items)
Alias for: push
connection_pool(options = {}, &block) click to toggle source

Fetch the ::ConnectionPool of Rubykiq::Connections

@return [::ConnectionPool]

# File lib/rubykiq/client.rb, line 50
def connection_pool(options = {}, &block)
  options = valid_options.merge(options)
  initialize_connection_pool(options) unless defined?(@connection_pool)

  if block_given?
    @connection_pool.with(&block)
  else
    @connection_pool
  end
end
push(items) click to toggle source

Push a Sidekiq job to Redis. Accepts a number of options:

:class - the worker class to call, required.
:queue - the named queue to use, optional ( default: 'default' )
:args - an array of simple arguments to the perform method, must be JSON-serializable, optional ( default: [] )
:retry - whether to retry this job if it fails, true or false, default true, optional ( default: true )
:at - when the job should be executed. This can be a `Time`, `Date` or any `Time.parse`-able strings, optional.

Returns nil if not pushed to Redis. In the case of an indvidual job a job ID will be returned, if multiple jobs are pushed the size of the jobs will be returned

Example:

Rubykiq.push(:class => 'Worker', :args => ['foo', 1, :bat => 'bar'])
Rubykiq.push(:class => 'Scheduler', :queue => 'scheduler')
Rubykiq.push(:class => 'DelayedMailer', :at => '2013-01-01T09:00:00Z')
Rubykiq.push(:class => 'Worker', :args => [['foo'], ['bar']])

@param items [Array]

# File lib/rubykiq/client.rb, line 79
def push(items)
  fail(ArgumentError, 'Message must be a Hash') unless items.is_a?(Hash)
  fail(ArgumentError, 'Message args must be an Array') if items[:args] && !items[:args].is_a?(Array)

  # args are optional
  items[:args] ||= []

  # determine if this items arg's is a nested array
  items[:args].first.is_a?(Array) ? push_many(items) : push_one(items)
end
Also aliased as: <<

Private Instance Methods

default_options() click to toggle source

Create a hash of the default options and their values

# File lib/rubykiq/client.rb, line 99
def default_options
  DEFAULT_OPTIONS
end
initialize_connection_pool(options = {}) click to toggle source
# File lib/rubykiq/client.rb, line 204
def initialize_connection_pool(options = {})
  Thread.exclusive do
    @connection_pool = ::ConnectionPool.new(timeout: redis_pool_timeout, size: redis_pool_size) do
      Rubykiq::Connection.new(options)
    end
  end
end
normalize_item(item) click to toggle source
# File lib/rubykiq/client.rb, line 158
def normalize_item(item)
  fail(ArgumentError, 'Message must be a Hash') unless item.is_a?(Hash)
  fail(ArgumentError, "Message must include a class and set of arguments: #{item.inspect}") if !item[:class] || !item[:args]
  fail(ArgumentError, 'Message args must be an Array') if item[:args] && !item[:args].is_a?(Array)
  fail(ArgumentError, 'Message class must be a String representation of the class name') unless item[:class].is_a?(String)

  # normalize the time
  item[:at] = normalize_time(item[:at]) if item[:at]
  pre_normalized_item = item.clone

  # args are optional
  pre_normalized_item[:args] ||= []

  # apply the default options
  [:retry, :queue].each do |key|
    pre_normalized_item[key] = send("#{key}") unless pre_normalized_item.key?(key)
  end

  # provide a job ID
  pre_normalized_item[:jid] = ::SecureRandom.hex(12)

  # Sidekiq::Queue#latency (used in sidekiq web), requires `enqueued_at`
  pre_normalized_item[:enqueued_at] = Time.now.to_f

  pre_normalized_item
end
normalize_time(time) click to toggle source

Given an object meant to represent time, try to convert it intelligently to a float

# File lib/rubykiq/client.rb, line 186
def normalize_time(time)
  # if the time param is a `Date` / `String` convert it to a `Time` object
  if time.is_a?(Date)
    normalized_time = time.to_time
  elsif time.is_a?(String)
    normalized_time = Time.parse(time)
  else
    normalized_time = time
  end

  # convert the `Time` object to a float (if necessary)
  normalized_time = normalized_time.to_f unless normalized_time.is_a?(Numeric)

  normalized_time
end
push_many(items) click to toggle source

when multiple item’s are needing to be persisted to redis

# File lib/rubykiq/client.rb, line 122
def push_many(items)
  # we're expecting items to have an nested array of args, lets take each one and correctly normalize them
  payloads = items[:args].map do |args|
    fail ArgumentError, "Bulk arguments must be an Array of Arrays: [[:foo => 'bar'], [:foo => 'foo']]" unless args.is_a?(Array)
    # clone the original items (for :queue, :class, etc..)
    item = items.clone
    # merge this item's args (eg the nested `arg` array)
    item.merge!(args: args) unless args.empty?
    # normalize this individual item
    normalize_item(item)
  end.compact

  # if successfully persisted to redis return the size of the jobs
  pushed = false
  pushed = raw_push(payloads) unless payloads.empty?
  pushed ? payloads.size : nil
end
push_one(item) click to toggle source

when only one item is needed to persisted to redis

# File lib/rubykiq/client.rb, line 111
def push_one(item)
  # we're expecting item to be a single item so simply normalize it
  payload = normalize_item(item)

  # if successfully persisted to redis return this item's `jid`
  pushed = false
  pushed = raw_push([payload]) if payload
  pushed ? payload[:jid] : nil
end
raw_push(payloads) click to toggle source

persist the job message(s)

# File lib/rubykiq/client.rb, line 141
def raw_push(payloads)
  pushed = false
  connection_pool do |connection|
    if payloads.first[:at]
      pushed = connection.zadd('schedule', payloads.map { |item| [item[:at].to_s, ::MultiJson.encode(item)] })
    else
      q = payloads.first[:queue]
      to_push = payloads.map { |item| ::MultiJson.encode(item) }
      _, pushed = connection.multi do
        connection.sadd('queues', q)
        connection.lpush("queue:#{q}", to_push)
      end
    end
  end
  pushed
end
reset_options() click to toggle source

Set the VALID_OPTIONS_KEYS with their DEFAULT_OPTIONS

# File lib/rubykiq/client.rb, line 104
def reset_options
  VALID_OPTIONS_KEYS.each do |key|
    send("#{key}=", default_options[key])
  end
end
valid_options() click to toggle source

Create a hash of options and their values

# File lib/rubykiq/client.rb, line 94
def valid_options
  VALID_OPTIONS_KEYS.reduce({}) { |a, e| a.merge!(e => send(e)) }
end