module Octokiq::Server

Public Instance Methods

configuration() click to toggle source
# File lib/octokiq/server.rb, line 23
def configuration
  @configuration ||= Configuration.new
end
configure() { |configuration| ... } click to toggle source
# File lib/octokiq/server.rb, line 19
def configure
  yield(configuration)
end
start() click to toggle source
# File lib/octokiq/server.rb, line 27
def start
  configuration.thread_mode ? handle_thread_jobs : ractor_workers

  loop do
    job = connection.fetch(queues)
    pipe << job
  end
end

Private Instance Methods

concurrency() click to toggle source
# File lib/octokiq/server.rb, line 76
def concurrency
  @concurrency ||= configuration.concurrency
end
connection() click to toggle source
# File lib/octokiq/server.rb, line 38
def connection
  @connection ||= Connection.new
end
handle_thread_jobs() click to toggle source
# File lib/octokiq/server.rb, line 52
def handle_thread_jobs
  semaphore = Mutex.new
  counter = 0
  Thread.new do
    while counter < concurrency && job = pipe.take
      semaphore.synchronize { counter += 1 }
      Thread.new do
        Processor.new(job).run
        semaphore.synchronize { counter -= 1 }
      end
    end
  end
end
pipe() click to toggle source
# File lib/octokiq/server.rb, line 84
def pipe
  @pipe ||= Ractor.new do
    loop do
      Ractor.yield Ractor.receive
    end
  end
end
queues() click to toggle source
# File lib/octokiq/server.rb, line 80
def queues
  configuration.queues
end
ractor_workers() click to toggle source
# File lib/octokiq/server.rb, line 42
def ractor_workers
  @ractor_workers ||= (1..concurrency).map do
    Ractor.new(pipe) do |pipe|
      while job = pipe.take
        Ractor.yield Processor.new(job).run
      end
    end
  end
end
thread_workers() click to toggle source
# File lib/octokiq/server.rb, line 66
def thread_workers
  @thread_workers = []
  Thread.new do
    while job = pipe.take
      Processor.new(job).run
    end
  end
  @thread_workers
end