class Backburner::Workers::Threading
Attributes
shutdown_timeout[RW]
threads_number[RW]
exit_on_shutdown[RW]
self_read[RW]
self_write[RW]
Public Class Methods
new(*args)
click to toggle source
Custom initializer just to set @tubes_data
Calls superclass method
Backburner::Worker::new
# File lib/backburner/workers/threading.rb, line 16 def initialize(*args) @tubes_data = {} super self.process_tube_options @exit_on_shutdown = true end
Public Instance Methods
kill()
click to toggle source
# File lib/backburner/workers/threading.rb, line 141 def kill @thread_pools.each { |_name, pool| pool.kill unless pool.shutdown? } end
prepare()
click to toggle source
Used to prepare job queues before processing jobs. Setup beanstalk tube_names and watch all specified tubes for jobs.
@raise [Beaneater::NotConnected] If beanstalk fails to connect. @example
@worker.prepare
# File lib/backburner/workers/threading.rb, line 30 def prepare self.tube_names.map! { |name| expand_tube_name(name) }.uniq! log_info "Working #{tube_names.size} queues: [ #{tube_names.join(', ')} ]" @thread_pools = {} @tubes_data.each do |name, config| max_threads = (config[:threads] || self.class.threads_number || ::Concurrent.processor_count).to_i @thread_pools[name] = (::Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: max_threads)) end end
process_tube_names(tube_names)
click to toggle source
Process the special tube_names of Threading
worker:
The format is tube_name:custom_threads_limit
@example
process_tube_names(['foo:10', 'lol']) => ['foo', lol']
# File lib/backburner/workers/threading.rb, line 88 def process_tube_names(tube_names) names = compact_tube_names(tube_names) if names.nil? nil else names.map do |name| data = name.split(":") tube_name = data.first threads_number = data[1].empty? ? nil : data[1].to_i rescue nil @tubes_data[expand_tube_name(tube_name)] = { :threads => threads_number } tube_name end end end
process_tube_options()
click to toggle source
Process the tube settings This overrides @tubes_data set by process_tube_names
method. So a tube has name 'super_job:5' and the tube class has setting queue_jobs_limit 10, the result limit will be 10 If the tube is known by existing allq queue, but not by class - skip it
# File lib/backburner/workers/threading.rb, line 110 def process_tube_options Backburner::Worker.known_queue_classes.each do |queue| next if @tubes_data[expand_tube_name(queue)].nil? queue_settings = { :threads => queue.queue_jobs_limit } @tubes_data[expand_tube_name(queue)].merge!(queue_settings){|k, v1, v2| v2.nil? ? v1 : v2 } end end
register_signal_handlers!()
click to toggle source
Registers signal handlers TERM and INT to trigger
# File lib/backburner/workers/threading.rb, line 152 def register_signal_handlers! @self_read, @self_write = IO.pipe %w[TERM INT].each do |sig| trap(sig) do raise Interrupt if @in_shutdown self_write.puts(sig) end end end
shutdown()
click to toggle source
Calls superclass method
Backburner::Worker#shutdown
# File lib/backburner/workers/threading.rb, line 145 def shutdown log_info "beginning graceful worker shutdown" shutdown_threadpools super if @exit_on_shutdown end
shutdown_threadpools()
click to toggle source
# File lib/backburner/workers/threading.rb, line 127 def shutdown_threadpools @thread_pools.each { |_name, pool| pool.shutdown } shutdown_time = Time.now @in_shutdown = true all_shutdown = @thread_pools.all? do |_name, pool| time_to_wait = self.class.shutdown_timeout - (Time.now - shutdown_time).to_i pool.wait_for_termination(time_to_wait) if time_to_wait > 0 end rescue Interrupt log_info "graceful shutdown aborted, shutting down immediately" ensure kill unless all_shutdown end
start(wait=true)
click to toggle source
Starts processing new jobs indefinitely. Primary way to consume and process jobs in specified tubes.
@example
@worker.start
# File lib/backburner/workers/threading.rb, line 46 def start(wait=true) prepare @thread_pools.each do |tube_name, pool| pool.max_length.times do # Create a new connection and set it up to listen on this tube name # connection = new_connection.tap{ |conn| conn.tubes.watch!(tube_name) } # connection.on_reconnect = lambda { |conn| conn.tubes.watch!(tube_name) } # Make it work jobs using its own connection per thread pool.post(connection) do |memo_connection| # TODO: use read-write lock? loop do begin break if @in_shutdown work_one_job(memo_connection, tube_name) rescue => e log_error("Exception caught in thread pool loop. Continuing. -> #{e.message}\nBacktrace: #{e.backtrace}") end end connection.close end end end wait_for_shutdown! if wait end
wait_for_shutdown!()
click to toggle source
Wait for the shutdown signel
# File lib/backburner/workers/threading.rb, line 121 def wait_for_shutdown! raise Interrupt while IO.select([self_read]) rescue Interrupt shutdown end