module SideJob

helpers for testing

Constants

CONFIGURATION

Configuration parameters

VERSION

The current SideJob version

Public Class Methods

context(data) { || ... } click to toggle source

Adds to the current SideJob context within the block. @param data [Hash] Data to be merged into the current context

# File lib/sidejob.rb, line 119
def self.context(data, &block)
  previous = Thread.current[:sidejob_context]
  Thread.current[:sidejob_context] = (previous || {}).merge(data.symbolize_keys)
  yield
ensure
  Thread.current[:sidejob_context] = previous
end
find(name_or_id) click to toggle source

Finds a job by name or id. @param name_or_id [String, Integer] Job name or id @return [SideJob::Job, nil] Job object or nil if it doesn’t exist

# File lib/sidejob.rb, line 85
def self.find(name_or_id)
  SideJob::Job.new(name_or_id) rescue nil
end
log(entry) click to toggle source

Publishes a log message using the current SideJob context. @param entry [Hash|Exception|String] Log entry

# File lib/sidejob.rb, line 97
def self.log(entry)
  context = (Thread.current[:sidejob_context] || {}).merge(timestamp: SideJob.timestamp)

  if entry.is_a?(Exception)
    exception = entry
    entry = { error: exception.message }
    if exception.backtrace
      # only store the backtrace until the first sidekiq line
      entry[:backtrace] = exception.backtrace.take_while {|l| l !~ /sidekiq/}.join("\n")
    end
  elsif entry.is_a?(String)
    entry = { message: entry }
  end

  # Disable logging to prevent infinite publish loop for input ports subscribed to /sidejob/log which could generate log entries
  SideJob::Port.group(log: false) do
    SideJob.publish '/sidejob/log', context.merge(entry)
  end
end
publish(channel, message) click to toggle source

Publishes a message up the channel hierarchy to jobs by writing to ports subscribed to the channel. Also publishes to the destination channel only via normal redis pubsub. @param channel [String] Channel is path-like, separated by / to indicate hierarchy @param message [Object] JSON encodable message

# File lib/sidejob.rb, line 131
def self.publish(channel, message)
  # We don't publish at every level up hierarchy via redis pubsub since a client can use redis psubscribe
  SideJob.redis.publish channel, message.to_json

  job_subs = {}

  # Set the context to the original channel so that a job that subscribes to a higher channel can determine
  # the original channel that the message was sent to.
  SideJob.context({channel: channel}) do
    # walk up the channel hierarchy
    Pathname.new(channel).ascend do |channel|
      channel = channel.to_s
      jobs = SideJob.redis.smembers "channel:#{channel}"
      jobs.each do |id|
        job = SideJob.find(id)
        if ! job_subs.has_key?(id)
          job_subs[id] = {}
          if job
            SideJob.redis.hgetall("#{job.redis_key}:inports:channels").each_pair do |port, channels|
              channels = JSON.parse(channels)
              channels.each do |ch|
                job_subs[id][ch] ||= []
                job_subs[id][ch] << port
              end
            end
          end
        end

        if job && job_subs[id] && job_subs[id][channel]
          job_subs[id][channel].each do |port|
            job.input(port).write message
          end
        else
          # Job is gone or no longer subscribed to this channel
          SideJob.redis.srem "channel:#{channel}", id
        end
      end
    end
  end
end
queue(queue, klass, args: nil, as: nil, parent: nil, name: nil, at: nil, by: nil, inports: nil, outports: nil) click to toggle source

Main function to queue a job @param queue [String] Name of the queue to put the job in @param klass [String] Name of the class that will handle the job @param args [Array] additional args to pass to the worker’s perform method (default none) @param as [String] Add as alias to job or if name already taken, does not queue new job and returns nil @param parent [SideJob::Job] parent job @param name [String] Name of child job (required if parent specified) @param at [Time, Float] Time to schedule the job, otherwise queue immediately @param by [String] Who created this job. Recommend <type>:<id> format for non-jobs as SideJob uses job:<id>. @param inports [Hash{Symbol,String => Hash}] Input port configuration. Port name to options. @param outports [Hash{Symbol,String => Hash}] Output port configuration. Port name to options. @return [SideJob::Job] Job

# File lib/sidejob.rb, line 49
def self.queue(queue, klass, args: nil, as: nil, parent: nil, name: nil, at: nil, by: nil, inports: nil, outports: nil)
  raise "No worker registered for #{klass} in queue #{queue}" unless SideJob::Worker.config(queue, klass)

  return nil if as && SideJob.redis.hget('jobs:aliases', as)

  # To prevent race conditions, we generate the id and set all data in redis before queuing the job to sidekiq
  # Otherwise, sidekiq may start the job too quickly
  id = SideJob.redis.incr('jobs:last_id')
  SideJob.redis.sadd 'jobs', id
  job = SideJob::Job.new(id)

  redis_key = job.redis_key
  SideJob.redis.multi do |multi|
    multi.set "#{redis_key}:worker", {queue: queue, class: klass, args: args}.to_json
    multi.set "#{redis_key}:status", 'completed'
    multi.set "#{redis_key}:created_at", SideJob.timestamp
    multi.set "#{redis_key}:created_by", by
  end

  if parent
    raise 'Missing name option for job with a parent' unless name
    parent.adopt(job, name)
  end

  # initialize ports
  job.inports = inports
  job.outports = outports

  job.add_alias(as) if as

  job.run(at: at)
end
redis() { |redis| ... } click to toggle source

Returns redis connection If block is given, yields the redis connection Otherwise, just returns the redis connection

# File lib/sidejob.rb, line 22
def self.redis
  Sidekiq.redis do |redis|
    if block_given?
      yield redis
    else
      redis
    end
  end
end
redis=(redis) click to toggle source

@param redis [Hash] Options for passing to Redis.new

# File lib/sidejob.rb, line 33
def self.redis=(redis)
  Sidekiq.redis = redis
end
timestamp() click to toggle source

Returns the current timestamp as a iso8601 string @return [String] Current timestamp

# File lib/sidejob.rb, line 91
def self.timestamp
  Time.now.utc.iso8601(9)
end