class ActiveConcurrency::Schedulers::Topic

Public Class Methods

new(pool, **options) click to toggle source
# File lib/active_concurrency/schedulers/topic.rb, line 7
def initialize(pool, **options)
  topics = options[:topics]
  mutexes = topics.each_with_object({}) do |t, h|
    h[t] = Mutex.new
  end

  topics = topics.cycle
  @pool = pool.each_with_object({}) do |w, h|
    topic = topics.next
    w.mutex = mutexes[topic]
    h.key?(topic) ? (h[topic] << w) : (h[topic] = [w])
  end
end

Public Instance Methods

schedule(*args, &block) click to toggle source
# File lib/active_concurrency/schedulers/topic.rb, line 21
def schedule(*args, &block)
  topic = args.pop
  worker = @pool[topic].min_by(&:size)
  worker.schedule(*args, &block)
end

Private Instance Methods

topics_pool() click to toggle source
# File lib/active_concurrency/schedulers/topic.rb, line 29
def topics_pool
  pool.each_with_object({}) do |w, h|
    topic = topics.next
    h.key?(topic) ? (h[topic] << w) : (h[topic] = [w])
  end
end