class ActiveConcurrency::Base::Worker

Attributes

mutex[RW]
name[R]

Public Class Methods

new(name: nil) click to toggle source
# File lib/active_concurrency/base/worker.rb, line 12
def initialize(name: nil)
  @name = "#{prefix}_worker_#{name || SecureRandom.uuid}"
  @queue = Queue.new
end

Public Instance Methods

clear() click to toggle source
# File lib/active_concurrency/base/worker.rb, line 17
def clear
  @queue.clear
end
close() click to toggle source
# File lib/active_concurrency/base/worker.rb, line 21
def close
  @queue.close
end
closed?() click to toggle source
# File lib/active_concurrency/base/worker.rb, line 25
def closed?
  @queue.closed?
end
empty?() click to toggle source
# File lib/active_concurrency/base/worker.rb, line 29
def empty?
  @queue.empty?
end
exit() click to toggle source
# File lib/active_concurrency/base/worker.rb, line 33
def exit
  schedule { throw :exit }
end
lock() click to toggle source
# File lib/active_concurrency/base/worker.rb, line 37
def lock
  return true if process? || mutex.nil? || mutex.locked?

  mutex.lock
end
schedule(*args, &block) click to toggle source
# File lib/active_concurrency/base/worker.rb, line 43
def schedule(*args, &block)
  @queue << [block, args]
end
shutdown() click to toggle source

rubocop:disable Lint/UnreachableCode

# File lib/active_concurrency/base/worker.rb, line 48
def shutdown
  exit
  lock
  join
  close
  exit!
end
size() click to toggle source

rubocop:enable Lint/UnreachableCode

# File lib/active_concurrency/base/worker.rb, line 57
def size
  @queue.size
end

Private Instance Methods

execute() click to toggle source
# File lib/active_concurrency/base/worker.rb, line 63
def execute
  job, args = @queue.pop

  if mutex.nil? || process?
    job.call(*args)
  else
    mutex.synchronize { job.call(*args) }
  end
end
perform() click to toggle source
# File lib/active_concurrency/base/worker.rb, line 80
def perform
  catch(:exit) do
    loop do
      break if closed? || empty?

      begin
        execute
      rescue Exception => e
        puts "#{e.class.name}: #{e.message}\n#{e.backtrace.join("\n")}"
      end
    end
  end
end
prefix() click to toggle source
# File lib/active_concurrency/base/worker.rb, line 73
def prefix
  @prefix ||= begin
    klass = self.class.name.split('::')[1]
    klass.downcase
  end
end
process?() click to toggle source
# File lib/active_concurrency/base/worker.rb, line 94
def process?
  prefix == 'processes'
end
thread?() click to toggle source
# File lib/active_concurrency/base/worker.rb, line 98
def thread?
  prefix == 'threads'
end