class ActiveConcurrency::Schedulers::Topic
Public Class Methods
new(pool, **options)
click to toggle source
# File lib/active_concurrency/schedulers/topic.rb, line 7 def initialize(pool, **options) topics = options[:topics] mutexes = topics.each_with_object({}) do |t, h| h[t] = Mutex.new end topics = topics.cycle @pool = pool.each_with_object({}) do |w, h| topic = topics.next w.mutex = mutexes[topic] h.key?(topic) ? (h[topic] << w) : (h[topic] = [w]) end end
Public Instance Methods
schedule(*args, &block)
click to toggle source
# File lib/active_concurrency/schedulers/topic.rb, line 21 def schedule(*args, &block) topic = args.pop worker = @pool[topic].min_by(&:size) worker.schedule(*args, &block) end
Private Instance Methods
topics_pool()
click to toggle source
# File lib/active_concurrency/schedulers/topic.rb, line 29 def topics_pool pool.each_with_object({}) do |w, h| topic = topics.next h.key?(topic) ? (h[topic] << w) : (h[topic] = [w]) end end