module Octokiq::Server
Public Instance Methods
configuration()
click to toggle source
# File lib/octokiq/server.rb, line 23 def configuration @configuration ||= Configuration.new end
configure() { |configuration| ... }
click to toggle source
# File lib/octokiq/server.rb, line 19 def configure yield(configuration) end
start()
click to toggle source
# File lib/octokiq/server.rb, line 27 def start configuration.thread_mode ? handle_thread_jobs : ractor_workers loop do job = connection.fetch(queues) pipe << job end end
Private Instance Methods
concurrency()
click to toggle source
# File lib/octokiq/server.rb, line 76 def concurrency @concurrency ||= configuration.concurrency end
connection()
click to toggle source
# File lib/octokiq/server.rb, line 38 def connection @connection ||= Connection.new end
handle_thread_jobs()
click to toggle source
# File lib/octokiq/server.rb, line 52 def handle_thread_jobs semaphore = Mutex.new counter = 0 Thread.new do while counter < concurrency && job = pipe.take semaphore.synchronize { counter += 1 } Thread.new do Processor.new(job).run semaphore.synchronize { counter -= 1 } end end end end
pipe()
click to toggle source
# File lib/octokiq/server.rb, line 84 def pipe @pipe ||= Ractor.new do loop do Ractor.yield Ractor.receive end end end
queues()
click to toggle source
# File lib/octokiq/server.rb, line 80 def queues configuration.queues end
ractor_workers()
click to toggle source
# File lib/octokiq/server.rb, line 42 def ractor_workers @ractor_workers ||= (1..concurrency).map do Ractor.new(pipe) do |pipe| while job = pipe.take Ractor.yield Processor.new(job).run end end end end
thread_workers()
click to toggle source
# File lib/octokiq/server.rb, line 66 def thread_workers @thread_workers = [] Thread.new do while job = pipe.take Processor.new(job).run end end @thread_workers end