module SideJob
helpers for testing
Constants
- CONFIGURATION
Configuration parameters
- VERSION
The current
SideJob
version
Public Class Methods
Adds to the current SideJob
context within the block. @param data [Hash] Data to be merged into the current context
# File lib/sidejob.rb, line 119 def self.context(data, &block) previous = Thread.current[:sidejob_context] Thread.current[:sidejob_context] = (previous || {}).merge(data.symbolize_keys) yield ensure Thread.current[:sidejob_context] = previous end
Publishes a log message using the current SideJob
context. @param entry [Hash|Exception|String] Log entry
# File lib/sidejob.rb, line 97 def self.log(entry) context = (Thread.current[:sidejob_context] || {}).merge(timestamp: SideJob.timestamp) if entry.is_a?(Exception) exception = entry entry = { error: exception.message } if exception.backtrace # only store the backtrace until the first sidekiq line entry[:backtrace] = exception.backtrace.take_while {|l| l !~ /sidekiq/}.join("\n") end elsif entry.is_a?(String) entry = { message: entry } end # Disable logging to prevent infinite publish loop for input ports subscribed to /sidejob/log which could generate log entries SideJob::Port.group(log: false) do SideJob.publish '/sidejob/log', context.merge(entry) end end
Publishes a message up the channel hierarchy to jobs by writing to ports subscribed to the channel. Also publishes to the destination channel only via normal redis pubsub. @param channel [String] Channel is path-like, separated by / to indicate hierarchy @param message [Object] JSON encodable message
# File lib/sidejob.rb, line 131 def self.publish(channel, message) # We don't publish at every level up hierarchy via redis pubsub since a client can use redis psubscribe SideJob.redis.publish channel, message.to_json job_subs = {} # Set the context to the original channel so that a job that subscribes to a higher channel can determine # the original channel that the message was sent to. SideJob.context({channel: channel}) do # walk up the channel hierarchy Pathname.new(channel).ascend do |channel| channel = channel.to_s jobs = SideJob.redis.smembers "channel:#{channel}" jobs.each do |id| job = SideJob.find(id) if ! job_subs.has_key?(id) job_subs[id] = {} if job SideJob.redis.hgetall("#{job.redis_key}:inports:channels").each_pair do |port, channels| channels = JSON.parse(channels) channels.each do |ch| job_subs[id][ch] ||= [] job_subs[id][ch] << port end end end end if job && job_subs[id] && job_subs[id][channel] job_subs[id][channel].each do |port| job.input(port).write message end else # Job is gone or no longer subscribed to this channel SideJob.redis.srem "channel:#{channel}", id end end end end end
Main function to queue a job @param queue [String] Name of the queue to put the job in @param klass [String] Name of the class that will handle the job @param args [Array] additional args to pass to the worker’s perform method (default none) @param as [String] Add as alias to job or if name already taken, does not queue new job and returns nil @param parent [SideJob::Job] parent job @param name [String] Name of child job (required if parent specified) @param at [Time, Float] Time to schedule the job, otherwise queue immediately @param by [String] Who created this job. Recommend <type>:<id> format for non-jobs as SideJob
uses job:<id>. @param inports [Hash{Symbol,String => Hash}] Input port configuration. Port
name to options. @param outports [Hash{Symbol,String => Hash}] Output port configuration. Port
name to options. @return [SideJob::Job] Job
# File lib/sidejob.rb, line 49 def self.queue(queue, klass, args: nil, as: nil, parent: nil, name: nil, at: nil, by: nil, inports: nil, outports: nil) raise "No worker registered for #{klass} in queue #{queue}" unless SideJob::Worker.config(queue, klass) return nil if as && SideJob.redis.hget('jobs:aliases', as) # To prevent race conditions, we generate the id and set all data in redis before queuing the job to sidekiq # Otherwise, sidekiq may start the job too quickly id = SideJob.redis.incr('jobs:last_id') SideJob.redis.sadd 'jobs', id job = SideJob::Job.new(id) redis_key = job.redis_key SideJob.redis.multi do |multi| multi.set "#{redis_key}:worker", {queue: queue, class: klass, args: args}.to_json multi.set "#{redis_key}:status", 'completed' multi.set "#{redis_key}:created_at", SideJob.timestamp multi.set "#{redis_key}:created_by", by end if parent raise 'Missing name option for job with a parent' unless name parent.adopt(job, name) end # initialize ports job.inports = inports job.outports = outports job.add_alias(as) if as job.run(at: at) end
Returns redis connection If block is given, yields the redis connection Otherwise, just returns the redis connection
# File lib/sidejob.rb, line 22 def self.redis Sidekiq.redis do |redis| if block_given? yield redis else redis end end end
@param redis [Hash] Options for passing to Redis.new
# File lib/sidejob.rb, line 33 def self.redis=(redis) Sidekiq.redis = redis end
Returns the current timestamp as a iso8601 string @return [String] Current timestamp
# File lib/sidejob.rb, line 91 def self.timestamp Time.now.utc.iso8601(9) end