module SideJob::Worker

All workers should include SideJob::Worker and implement the perform method. @see SideJob::JobMethods

Attributes

registry[R]

This holds the registry for all available workers on one queue

Public Class Methods

config(queue, klass) click to toggle source

Returns the configuration registered for a worker. @param queue [String] Name of queue @param klass [String] Name of worker class @return [Hash, nil] Returns nil if the worker is not defined @see ClassMethods#register

# File lib/sidejob/worker.rb, line 29
def config(queue, klass)
  config = SideJob.redis.hget "workers:#{queue}", klass
  config = JSON.parse(config) if config
  config
end
drain_queue(timeout: 5, errors: true) click to toggle source

Run jobs until the queue is cleared. @param timeout [Float] timeout in seconds for Timeout#timeout (default 5) @param errors [Boolean] Whether to propagate errors that occur in jobs (default true)

# File lib/sidejob/testing.rb, line 8
def self.drain_queue(timeout: 5, errors: true)
  Timeout::timeout(timeout) do
    have_job = true
    while have_job
      have_job = false
      Sidekiq::Queue.all.each do |queue|
        queue.each do |sidekiq_job|
          have_job = true
          sidekiq_job.delete

          SideJob.find(sidekiq_job.jid).run_inline(errors: errors, queue: false, args: sidekiq_job.args)
        end
      end
    end
  end
end
included(base) click to toggle source

@see SideJob::Worker

# File lib/sidejob/worker.rb, line 55
def self.included(base)
  base.class_eval do
    include Sidekiq::Worker
    include SideJob::JobMethods
    include SideJob::Worker::OverrideMethods
  end
  base.extend(ClassMethods)
end
register_all(queue) click to toggle source

Workers need to add themselves to the registry even if it’s an empty configuration. This method publishes the registry to redis so that other workers can call workers on this queue. All workers for the queue should be defined as the existing registry is overwritten. @param queue [String] Queue to register all defined workers

# File lib/sidejob/worker.rb, line 14
def register_all(queue)
  SideJob.redis.multi do |multi|
    multi.del "workers:#{queue}"
    multi.hmset "workers:#{queue}", @registry.map {|key, val| [key, val.to_json]}.flatten(1) if @registry.size > 0
  end
  SideJob::Port.group(log: false) do
    SideJob.publish "/sidejob/workers/#{queue}", @registry
  end
end

Public Instance Methods

for_inputs(*inputs) { |*map(&:read)| ... } click to toggle source

Reads a set of input ports together. Workers should use this method where possible instead of reading directly from ports due to complexities of dealing with ports with defaults. A worker should be idempotent (it can be called multiple times on the same state). Consider reading from a single port with a default value. Each time it is run, it could read the same data from the port. The output of the job then could depend on the number of times it is run. If all input port have defaults, this method remembers the call and will only yield once even over multiple runs. In addition, any writes to output ports inside the block will instead set the default value of the port. Yields data from the ports until either no ports have data or is suspended due to data on some but not all ports. @param inputs [Array<String>] List of input ports to read @yield [Array] Splat of input data in same order as inputs @raise [SideJob::Worker::Suspended] Raised if an input port without a default has data but not all ports

# File lib/sidejob/worker.rb, line 85
def for_inputs(*inputs, &block)
  return unless inputs.length > 0
  ports = inputs.map {|name| input(name)}
  loop do
    SideJob::Port.group do
      info = ports.map {|port| [ port.size > 0, port.default? ] }

      return unless info.any? {|x| x[0] || x[1]} # Nothing to do if there's no data to read
      if info.any? {|x| x[0]}
        # some port has data, suspend unless every port has data or default
        suspend unless info.all? {|x| x[0] || x[1] }
        yield *ports.map(&:read)
      elsif info.all? {|x| x[1]}
        # all ports have default and no data
        defaults = ports.map(&:default)
        last_default = get(:for_inputs) || []
        return unless defaults != last_default
        set({for_inputs: defaults})
        SideJob::Port.group(set_default: true) do
          yield *defaults
        end
        return
      else
        # No ports have data and not every port has a default value so nothing to do
        return
      end
    end
  end
end
suspend() click to toggle source

Immediately suspend the current worker @raise [SideJob::Worker::Suspended]

# File lib/sidejob/worker.rb, line 69
def suspend
  raise Suspended
end