class Skiplock::Worker
Public Class Methods
cleanup(hostname = nil)
click to toggle source
# File lib/skiplock/worker.rb, line 6 def self.cleanup(hostname = nil) delete_ids = [] self.where(hostname: hostname || Socket.gethostname).each do |worker| sid = Process.getsid(worker.pid) rescue nil delete_ids << worker.id if worker.sid != sid || worker.updated_at < 10.minutes.ago end self.where(id: delete_ids).delete_all if delete_ids.count > 0 end
generate(capacity:, hostname:, master: true)
click to toggle source
# File lib/skiplock/worker.rb, line 15 def self.generate(capacity:, hostname:, master: true) self.create!(pid: Process.pid, sid: Process.getsid(), master: master, hostname: hostname, capacity: capacity) rescue self.create!(pid: Process.pid, sid: Process.getsid(), master: false, hostname: hostname, capacity: capacity) end
Public Instance Methods
shutdown()
click to toggle source
# File lib/skiplock/worker.rb, line 35 def shutdown @running = false @executor.shutdown @executor.kill unless @executor.wait_for_termination(@config[:graceful_shutdown]) self.delete Skiplock.logger.info "[Skiplock] Shutdown of #{self.master ? 'master' : 'cluster'} worker#{(' ' + @num.to_s) if @num > 0 && @config[:workers] > 2} (PID: #{self.pid}) was completed." end
start(worker_num: 0, **config)
click to toggle source
# File lib/skiplock/worker.rb, line 21 def start(worker_num: 0, **config) if self.master Job.flush Cron.setup end @num = worker_num @config = config @queues_order_query = @config[:queues].map { |q,v| "WHEN queue_name = '#{q}' THEN #{v}" }.join(' ') if @config[:queues].is_a?(Hash) && @config[:queues].count > 0 @running = true @executor = Concurrent::ThreadPoolExecutor.new(min_threads: @config[:min_threads] + 1, max_threads: @config[:max_threads] + 1, max_queue: @config[:max_threads], idletime: 60, auto_terminate: true, fallback_policy: :discard) @executor.post { run } Process.setproctitle("skiplock: #{self.master ? 'master' : 'cluster'} worker#{(' ' + @num.to_s) if @num > 0 && @config[:workers] > 2} [#{Rails.application.class.name.deconstantize.downcase}:#{Rails.env}]") if @config[:standalone] end
Private Instance Methods
run()
click to toggle source
# File lib/skiplock/worker.rb, line 45 def run sleep 3 ActiveRecord::Base.connection_pool.with_connection do |connection| Skiplock.logger.info "[Skiplock] Starting in #{@config[:standalone] ? 'standalone' : 'async'} mode (PID: #{self.pid}) with #{@config[:max_threads]} max threads as #{self.master ? 'master' : 'cluster'} worker#{(' ' + @num.to_s) if @num > 0 && @config[:workers] > 2}..." connection.exec_query('LISTEN "skiplock::jobs"') error = false next_schedule_at = Time.now.to_f timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC) while @running Rails.application.reloader.wrap do begin if error unless connection.active? connection.reconnect! sleep(0.5) connection.exec_query('LISTEN "skiplock::jobs"') next_schedule_at = Time.now.to_f end Job.flush if self.master error = false end if Time.now.to_f >= next_schedule_at && @executor.remaining_capacity > 0 job = nil connection.transaction do result = connection.select_all("SELECT id, running, scheduled_at FROM skiplock.jobs WHERE running = FALSE AND expired_at IS NULL AND finished_at IS NULL ORDER BY scheduled_at ASC NULLS FIRST,#{@queues_order_query ? ' CASE ' + @queues_order_query + ' ELSE NULL END ASC NULLS LAST,' : ''} priority ASC NULLS LAST, created_at ASC FOR UPDATE SKIP LOCKED LIMIT 1").first result = connection.select_all("UPDATE skiplock.jobs SET running = TRUE, worker_id = '#{self.id}', updated_at = NOW() WHERE id = '#{result['id']}' RETURNING *").first if result && result['scheduled_at'].to_f <= Time.now.to_f job = Job.instantiate(result) if result end if job.try(:running) @executor.post do Rails.application.executor.wrap { job.execute(purge_completion: @config[:purge_completion], max_retries: @config[:max_retries]) } end else next_schedule_at = (job ? job.scheduled_at.to_f : Float::INFINITY) end end job_notifications = [] connection.raw_connection.wait_for_notify(0.2) do |channel, pid, payload| job_notifications << payload if payload loop do payload = connection.raw_connection.notifies break unless @running && payload job_notifications << payload[:extra] end job_notifications.each do |n| op, id, worker_id, job_class, queue_name, running, expired_at, finished_at, scheduled_at = n.split(',') next if op == 'DELETE' || running == 'true' || expired_at.to_f > 0 || finished_at.to_f > 0 next_schedule_at = scheduled_at.to_f if scheduled_at.to_f < next_schedule_at end end if Process.clock_gettime(Process::CLOCK_MONOTONIC) - timestamp > 60 self.touch timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC) end rescue Exception => ex # most likely error with database connection Skiplock.logger.error(ex.to_s) Skiplock.logger.error(ex.backtrace.join("\n")) Skiplock.on_errors.each { |p| p.call(ex, @last_exception) } error = true wait(5) @last_exception = ex end sleep(0.3) end end connection.exec_query('UNLISTEN *') end end
wait(timeout)
click to toggle source
# File lib/skiplock/worker.rb, line 115 def wait(timeout) t = Process.clock_gettime(Process::CLOCK_MONOTONIC) while @running sleep(0.5) break if Process.clock_gettime(Process::CLOCK_MONOTONIC) - t > timeout end end