class Skiplock::Worker

Public Class Methods

cleanup(hostname = nil) click to toggle source
# File lib/skiplock/worker.rb, line 6
def self.cleanup(hostname = nil)
  delete_ids = []
  self.where(namespace: Skiplock.namespace, hostname: hostname || Socket.gethostname).each do |worker|
    sid = Process.getsid(worker.pid) rescue nil
    delete_ids << worker.id if worker.sid != sid || worker.updated_at < 10.minutes.ago
  end
  self.where(id: delete_ids).delete_all if delete_ids.count > 0
end
generate(capacity:, hostname:, master: true) click to toggle source
# File lib/skiplock/worker.rb, line 15
def self.generate(capacity:, hostname:, master: true)
  worker = self.create!(pid: Process.pid, sid: Process.getsid(), master: master, hostname: hostname, capacity: capacity, namespace: Skiplock.namespace)
rescue
  worker = self.create!(pid: Process.pid, sid: Process.getsid(), master: false, hostname: hostname, capacity: capacity, namespace: Skiplock.namespace)
end

Public Instance Methods

shutdown() click to toggle source
# File lib/skiplock/worker.rb, line 21
def shutdown
  @running = false
  @executor.shutdown
  @executor.kill unless @executor.wait_for_termination(@config[:graceful_shutdown])
  Skiplock.logger.info "[Skiplock] Shutdown of #{self.master ? 'master' : 'cluster'} worker#{(' ' + @num.to_s) if @num > 0 && @config[:workers] > 2} (PID: #{self.pid}) was completed."
end
start(worker_num: 0, **config) click to toggle source
# File lib/skiplock/worker.rb, line 28
def start(worker_num: 0, **config)
  @num = worker_num
  @config = config
  @pg_config = ActiveRecord::Base.connection.raw_connection.conninfo_hash.compact
  @namespace_query = Skiplock.namespace.nil? ? "namespace IS NULL" : "namespace = '#{Skiplock.namespace}'"
  @queues_order_query = @config[:queues].map { |q,v| "WHEN queue_name = '#{q}' THEN #{v}" }.join(' ') if @config[:queues].is_a?(Hash) && @config[:queues].count > 0
  @running = true
  @map = ::PG::TypeMapByOid.new
  @map.add_coder(::PG::TextDecoder::Boolean.new(oid: 16, name: 'bool'))
  @map.add_coder(::PG::TextDecoder::Integer.new(oid: 20, name: 'int8'))
  @map.add_coder(::PG::TextDecoder::Integer.new(oid: 21, name: 'int2'))
  @map.add_coder(::PG::TextDecoder::Integer.new(oid: 23, name: 'int4'))
  @map.add_coder(::PG::TextDecoder::TimestampUtc.new(oid: 1114, name: 'timestamp'))
  @map.add_coder(::PG::TextDecoder::String.new(oid: 2950, name: 'uuid'))
  @map.add_coder(::PG::TextDecoder::JSON.new(oid: 3802, name: 'jsonb'))
  @executor = Concurrent::ThreadPoolExecutor.new(min_threads: @config[:min_threads] + 1, max_threads: @config[:max_threads] + 1, max_queue: @config[:max_threads] + 1, idletime: 60, auto_terminate: false, fallback_policy: :abort)
  @executor.post { run }
  if @config[:standalone]
    Process.setproctitle("skiplock: #{self.master ? 'master' : 'cluster'} worker#{(' ' + @num.to_s) if @num > 0 && @config[:workers] > 2} [#{Rails.application.class.name.deconstantize.downcase}:#{Rails.env}]")
    ActiveRecord::Base.connection.throw_away!
  end
end

Private Instance Methods

establish_connection() click to toggle source
# File lib/skiplock/worker.rb, line 53
def establish_connection
  @connection = ::PG.connect(@pg_config)
  @connection.type_map_for_results = @map
  @connection.exec('LISTEN "skiplock::jobs"').clear
  @connection.exec('LISTEN "skiplock::workers"').clear
end
run() click to toggle source
# File lib/skiplock/worker.rb, line 60
def run
  sleep 3
  Skiplock.logger.info "[Skiplock] Starting in #{@config[:standalone] ? 'standalone' : 'async'} mode (PID: #{self.pid}) with #{@config[:max_threads]} max threads as #{self.master ? 'master' : 'cluster'} worker#{(' ' + @num.to_s) if @num > 0 && @config[:workers] > 2}..."
  next_schedule_at = Time.now.to_f
  pg_exception_timestamp = nil
  timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  while @running
    Rails.application.reloader.wrap do
      begin
        if @connection.nil? || @connection.status != ::PG::CONNECTION_OK
          establish_connection
          @executor.post { Rails.application.executor.wrap { Job.flush } } if self.master
          pg_exception_timestamp = nil
          next_schedule_at = Time.now.to_f
        end
        if Time.now.to_f >= next_schedule_at && @executor.remaining_capacity > 1  # reserves 1 slot in queue for Job.flush in case of pg_connection error
          result = nil
          @connection.transaction do |conn|
            conn.exec("SELECT id, running, scheduled_at FROM skiplock.jobs WHERE running = FALSE AND expired_at IS NULL AND finished_at IS NULL AND #{@namespace_query} ORDER BY scheduled_at ASC NULLS FIRST,#{@queues_order_query ? ' CASE ' + @queues_order_query + ' ELSE NULL END ASC NULLS LAST,' : ''} priority ASC NULLS LAST, created_at ASC FOR UPDATE SKIP LOCKED LIMIT 1") do |r|
              result = r.first
              conn.exec("UPDATE skiplock.jobs SET running = TRUE, worker_id = '#{self.id}', updated_at = NOW() WHERE id = '#{result['id']}' RETURNING *") { |r| result = r.first } if result && result['scheduled_at'].to_f <= Time.now.to_f
            end
          end
          if result && result['running']
            @executor.post { Rails.application.executor.wrap { Job.instantiate(result).execute(purge_completion: @config[:purge_completion], max_retries: @config[:max_retries]) } }
          else
            next_schedule_at = (result ? result['scheduled_at'].to_f : Float::INFINITY)
          end
        end
        notifications = { 'skiplock::jobs' => [], 'skiplock::workers' => [] }
        @connection.wait_for_notify(0.2) do |channel, pid, payload|
          notifications[channel] << payload if payload
          loop do
            payload = @connection.notifies
            break unless @running && payload
            notifications[payload[:relname]] << payload[:extra]
          end
          notifications['skiplock::jobs'].each do |n|
            op, id, worker_id, namespace, job_class, queue_name, running, expired_at, finished_at, scheduled_at = n.split(',')
            next if op == 'DELETE' || running == 'true' || namespace != Skiplock.namespace || expired_at.to_f > 0 || finished_at.to_f > 0
            next_schedule_at = scheduled_at.to_f if scheduled_at.to_f < next_schedule_at
          end
        end
        if Process.clock_gettime(Process::CLOCK_MONOTONIC) - timestamp > 60
          @connection.exec("UPDATE skiplock.workers SET updated_at = NOW() WHERE id = '#{self.id}'").clear
          timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)
        end
      rescue Exception => ex
        report_exception = true
        # if error is with database connection then only report if it persists longer than 1 minute
        if @connection.nil? || @connection.status != ::PG::CONNECTION_OK
          report_exception = false if pg_exception_timestamp.nil? || Process.clock_gettime(Process::CLOCK_MONOTONIC) - pg_exception_timestamp <= 60
          pg_exception_timestamp ||= Process.clock_gettime(Process::CLOCK_MONOTONIC)
        end
        if report_exception
          Skiplock.logger.error(ex.to_s)
          Skiplock.logger.error(ex.backtrace.join("\n"))
          Skiplock.on_errors.each { |p| p.call(ex) }
        end
        wait(5)
      end
      sleep(0.3)
    end
  end
ensure
  @connection.close if @connection && !@connection.finished?
end
wait(timeout = 1) click to toggle source
# File lib/skiplock/worker.rb, line 128
def wait(timeout = 1)
  t = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  while @running
    sleep(0.5)
    break if Process.clock_gettime(Process::CLOCK_MONOTONIC) - t > timeout
  end
end