class Rubykiq::Client
Constants
- DEFAULT_OPTIONS
A hash of valid options and their default values
- VALID_OPTIONS_KEYS
An array of valid keys in the options hash when configuring a ‘Rubykiq::Client`
Attributes
allow the connection_pool
to be set
Public Class Methods
Initialize a new Client
object
@param options [Hash]
# File lib/rubykiq/client.rb, line 40 def initialize(options = {}) reset_options options.each_pair do |key, value| send("#{key}=", value) if VALID_OPTIONS_KEYS.include?(key) end end
Public Instance Methods
Fetch the ::ConnectionPool of Rubykiq::Connections
@return [::ConnectionPool]
# File lib/rubykiq/client.rb, line 50 def connection_pool(options = {}, &block) options = valid_options.merge(options) initialize_connection_pool(options) unless defined?(@connection_pool) if block_given? @connection_pool.with(&block) else @connection_pool end end
Push a Sidekiq job to Redis. Accepts a number of options:
:class - the worker class to call, required. :queue - the named queue to use, optional ( default: 'default' ) :args - an array of simple arguments to the perform method, must be JSON-serializable, optional ( default: [] ) :retry - whether to retry this job if it fails, true or false, default true, optional ( default: true ) :at - when the job should be executed. This can be a `Time`, `Date` or any `Time.parse`-able strings, optional.
Returns nil if not pushed to Redis. In the case of an indvidual job a job ID will be returned, if multiple jobs are pushed the size of the jobs will be returned
Example:
Rubykiq.push(:class => 'Worker', :args => ['foo', 1, :bat => 'bar']) Rubykiq.push(:class => 'Scheduler', :queue => 'scheduler') Rubykiq.push(:class => 'DelayedMailer', :at => '2013-01-01T09:00:00Z') Rubykiq.push(:class => 'Worker', :args => [['foo'], ['bar']])
@param items [Array]
# File lib/rubykiq/client.rb, line 79 def push(items) fail(ArgumentError, 'Message must be a Hash') unless items.is_a?(Hash) fail(ArgumentError, 'Message args must be an Array') if items[:args] && !items[:args].is_a?(Array) # args are optional items[:args] ||= [] # determine if this items arg's is a nested array items[:args].first.is_a?(Array) ? push_many(items) : push_one(items) end
Private Instance Methods
Create a hash of the default options and their values
# File lib/rubykiq/client.rb, line 99 def default_options DEFAULT_OPTIONS end
# File lib/rubykiq/client.rb, line 204 def initialize_connection_pool(options = {}) Thread.exclusive do @connection_pool = ::ConnectionPool.new(timeout: redis_pool_timeout, size: redis_pool_size) do Rubykiq::Connection.new(options) end end end
# File lib/rubykiq/client.rb, line 158 def normalize_item(item) fail(ArgumentError, 'Message must be a Hash') unless item.is_a?(Hash) fail(ArgumentError, "Message must include a class and set of arguments: #{item.inspect}") if !item[:class] || !item[:args] fail(ArgumentError, 'Message args must be an Array') if item[:args] && !item[:args].is_a?(Array) fail(ArgumentError, 'Message class must be a String representation of the class name') unless item[:class].is_a?(String) # normalize the time item[:at] = normalize_time(item[:at]) if item[:at] pre_normalized_item = item.clone # args are optional pre_normalized_item[:args] ||= [] # apply the default options [:retry, :queue].each do |key| pre_normalized_item[key] = send("#{key}") unless pre_normalized_item.key?(key) end # provide a job ID pre_normalized_item[:jid] = ::SecureRandom.hex(12) # Sidekiq::Queue#latency (used in sidekiq web), requires `enqueued_at` pre_normalized_item[:enqueued_at] = Time.now.to_f pre_normalized_item end
Given an object meant to represent time, try to convert it intelligently to a float
# File lib/rubykiq/client.rb, line 186 def normalize_time(time) # if the time param is a `Date` / `String` convert it to a `Time` object if time.is_a?(Date) normalized_time = time.to_time elsif time.is_a?(String) normalized_time = Time.parse(time) else normalized_time = time end # convert the `Time` object to a float (if necessary) normalized_time = normalized_time.to_f unless normalized_time.is_a?(Numeric) normalized_time end
when multiple item’s are needing to be persisted to redis
# File lib/rubykiq/client.rb, line 122 def push_many(items) # we're expecting items to have an nested array of args, lets take each one and correctly normalize them payloads = items[:args].map do |args| fail ArgumentError, "Bulk arguments must be an Array of Arrays: [[:foo => 'bar'], [:foo => 'foo']]" unless args.is_a?(Array) # clone the original items (for :queue, :class, etc..) item = items.clone # merge this item's args (eg the nested `arg` array) item.merge!(args: args) unless args.empty? # normalize this individual item normalize_item(item) end.compact # if successfully persisted to redis return the size of the jobs pushed = false pushed = raw_push(payloads) unless payloads.empty? pushed ? payloads.size : nil end
when only one item is needed to persisted to redis
# File lib/rubykiq/client.rb, line 111 def push_one(item) # we're expecting item to be a single item so simply normalize it payload = normalize_item(item) # if successfully persisted to redis return this item's `jid` pushed = false pushed = raw_push([payload]) if payload pushed ? payload[:jid] : nil end
persist the job message(s)
# File lib/rubykiq/client.rb, line 141 def raw_push(payloads) pushed = false connection_pool do |connection| if payloads.first[:at] pushed = connection.zadd('schedule', payloads.map { |item| [item[:at].to_s, ::MultiJson.encode(item)] }) else q = payloads.first[:queue] to_push = payloads.map { |item| ::MultiJson.encode(item) } _, pushed = connection.multi do connection.sadd('queues', q) connection.lpush("queue:#{q}", to_push) end end end pushed end
Set the VALID_OPTIONS_KEYS
with their DEFAULT_OPTIONS
# File lib/rubykiq/client.rb, line 104 def reset_options VALID_OPTIONS_KEYS.each do |key| send("#{key}=", default_options[key]) end end
Create a hash of options and their values
# File lib/rubykiq/client.rb, line 94 def valid_options VALID_OPTIONS_KEYS.reduce({}) { |a, e| a.merge!(e => send(e)) } end