class Osbourne::Launcher

Attributes

threads[RW]

Public Class Methods

new() click to toggle source
# File lib/osbourne/launcher.rb, line 8
def initialize; end

Public Instance Methods

global_polling_threads() click to toggle source
# File lib/osbourne/launcher.rb, line 30
def global_polling_threads
  Osbourne::WorkerBase.descendants.map do |worker|
    Osbourne.logger.info("[Osbourne] Spawning thread for #{worker.name}")
    Thread.new do
      loop do
        poll(worker)
        break if @stop
      end
    end
  end
end
poll(worker) click to toggle source
# File lib/osbourne/launcher.rb, line 50
def poll(worker)
  worker.polling_queue.poll(wait_time_seconds:      worker.config[:max_wait],
                            max_number_of_messages: worker.config[:max_batch_size],
                            idle_timeout:           worker.config[:idle_timeout],
                            skip_delete:            true) do |messages|
    Osbourne.logger.info("[Osbourne] Recieved #{messages.count} on #{worker.name}")
    Array(messages).each {|msg| process(worker, Osbourne::Message.new(msg)) }
    throw :stop_polling if @stop
    Osbourne.logger.info("[Osbourne] Waiting for more messages on #{worker.name} for max of #{worker.config[:max_wait]} seconds")
  end
  Osbourne.logger.info("[Osbourne] Idle timeout on #{worker.name}")
end
start!() click to toggle source
# File lib/osbourne/launcher.rb, line 10
def start!
  Osbourne.logger.info("[Osbourne] Launching Osbourne workers")
  @stop = false
  @threads = global_polling_threads
end
stop() click to toggle source
# File lib/osbourne/launcher.rb, line 20
def stop
  puts "Signal caught. Terminating workers..."
  @stop = true
end
stop!() click to toggle source
# File lib/osbourne/launcher.rb, line 25
def stop!
  puts "Signal caught. Terminating workers..."
  @threads.each {|thr| Thread.kill(thr) }
end
wait!() click to toggle source
# File lib/osbourne/launcher.rb, line 16
def wait!
  threads.map(&:join)
end
worker_polling_threads(worker) click to toggle source
# File lib/osbourne/launcher.rb, line 42
def worker_polling_threads(worker)
  my_threads = []
  worker.config[:threads].times do
    my_threads << Thread.new { poll(worker) }
  end
  my_threads.each(&:join)
end

Private Instance Methods

process(worker, message) click to toggle source
# File lib/osbourne/launcher.rb, line 65
def process(worker, message) # rubocop:disable Metrics/AbcSize
  Osbourne.logger.info("[Osbourne] [MSG] Worker: #{worker.name} Valid: #{message.valid?} ID: #{message.id}")
  return false unless message.valid? && Osbourne.lock.soft_lock(message.id)

  Osbourne.cache.fetch(message.id, ex: 24.hours) do
    worker.new.process(message).tap {|_| Osbourne.lock.unlock(message.id) }
  end
  worker.polling_queue.delete_message(message.message)
rescue Exception => ex # rubocop:disable Lint/RescueException
  Osbourne.logger.error("[Osbourne] [MSG ID: #{message.id}] [#{ex.message}]\n #{ex.backtrace_locations.join("\n")}")
ensure
  return # rubocop:disable Lint/EnsureReturn
end