class ActiveConcurrency::Base::Worker
Attributes
mutex[RW]
name[R]
Public Class Methods
new(name: nil)
click to toggle source
# File lib/active_concurrency/base/worker.rb, line 12 def initialize(name: nil) @name = "#{prefix}_worker_#{name || SecureRandom.uuid}" @queue = Queue.new end
Public Instance Methods
clear()
click to toggle source
# File lib/active_concurrency/base/worker.rb, line 17 def clear @queue.clear end
close()
click to toggle source
# File lib/active_concurrency/base/worker.rb, line 21 def close @queue.close end
closed?()
click to toggle source
# File lib/active_concurrency/base/worker.rb, line 25 def closed? @queue.closed? end
empty?()
click to toggle source
# File lib/active_concurrency/base/worker.rb, line 29 def empty? @queue.empty? end
exit()
click to toggle source
# File lib/active_concurrency/base/worker.rb, line 33 def exit schedule { throw :exit } end
lock()
click to toggle source
# File lib/active_concurrency/base/worker.rb, line 37 def lock return true if process? || mutex.nil? || mutex.locked? mutex.lock end
schedule(*args, &block)
click to toggle source
# File lib/active_concurrency/base/worker.rb, line 43 def schedule(*args, &block) @queue << [block, args] end
shutdown()
click to toggle source
rubocop:disable Lint/UnreachableCode
# File lib/active_concurrency/base/worker.rb, line 48 def shutdown exit lock join close exit! end
size()
click to toggle source
rubocop:enable Lint/UnreachableCode
# File lib/active_concurrency/base/worker.rb, line 57 def size @queue.size end
Private Instance Methods
execute()
click to toggle source
# File lib/active_concurrency/base/worker.rb, line 63 def execute job, args = @queue.pop if mutex.nil? || process? job.call(*args) else mutex.synchronize { job.call(*args) } end end
perform()
click to toggle source
# File lib/active_concurrency/base/worker.rb, line 80 def perform catch(:exit) do loop do break if closed? || empty? begin execute rescue Exception => e puts "#{e.class.name}: #{e.message}\n#{e.backtrace.join("\n")}" end end end end
prefix()
click to toggle source
# File lib/active_concurrency/base/worker.rb, line 73 def prefix @prefix ||= begin klass = self.class.name.split('::')[1] klass.downcase end end
process?()
click to toggle source
# File lib/active_concurrency/base/worker.rb, line 94 def process? prefix == 'processes' end
thread?()
click to toggle source
# File lib/active_concurrency/base/worker.rb, line 98 def thread? prefix == 'threads' end