class Roundhousekiq::Runner

Attributes

channel[RW]
connection[RW]
consumer[RW]
error_exchange[RW]
exchange[RW]
queue[RW]
queue_worker_map[RW]
queues[RW]
shutdown_runner[RW]

Public Class Methods

client_settings() click to toggle source
# File lib/roundhousekiq/runner.rb, line 57
def self.client_settings
  Bunny::Session::DEFAULT_CLIENT_PROPERTIES.merge product: 'Roundhousekiq'
end
connection_settings() click to toggle source
# File lib/roundhousekiq/runner.rb, line 52
def self.connection_settings
  config = Roundhousekiq.config.to_h
  config.select { |k, v| %i(host port vhost username password).include? k }
end
new() click to toggle source

Public API

# File lib/roundhousekiq/runner.rb, line 11
def initialize
  self.queues = []
  self.queue_worker_map = {}
end

Public Instance Methods

create_channel() click to toggle source
# File lib/roundhousekiq/runner.rb, line 61
def create_channel
  self.channel = self.connection.create_channel
  self.channel.prefetch Roundhousekiq.config.prefetch
  self.channel
end
create_exchanges_and_queues() click to toggle source
# File lib/roundhousekiq/runner.rb, line 67
def create_exchanges_and_queues
  Workers.definitions.each do |worker, definition|
    exchange = self.channel.exchange(
      definition.exchange[:name],
      type: definition.exchange[:type],
      durable: true
    )

    queue = self.channel.queue(
      definition.queue[:name],
      auto_delete: definition.queue[:auto_delete],
      durable: definition.queue[:durable]
    ).bind(exchange, routing_key: definition.queue[:routing_key])

    self.queues << queue
    self.queue_worker_map[queue] = worker
  end
end
establish_connection() click to toggle source

Connection

# File lib/roundhousekiq/runner.rb, line 45
def establish_connection
  options = { properties: self.class.client_settings }

  self.connection = Bunny.new self.class.connection_settings, options
  self.connection.start
end
process_message(queue, payload) click to toggle source
# File lib/roundhousekiq/runner.rb, line 99
def process_message(queue, payload)
  queue_worker_map[queue].perform_async JSON.parse(payload)
end
run() click to toggle source
# File lib/roundhousekiq/runner.rb, line 16
def run
  establish_connection
  create_channel
  create_exchanges_and_queues
  setup_subscribers
end
setup_subscribers() click to toggle source
# File lib/roundhousekiq/runner.rb, line 86
def setup_subscribers
  self.queues.each do |queue|
    queue.subscribe(manual_ack: true) do |delivery_info, metadata, payload|
      self.channel.ack delivery_info.delivery_tag
      process_message queue, payload
    end
  end

  while not self.shutdown?
    sleep 5
  end
end
shutdown() click to toggle source
# File lib/roundhousekiq/runner.rb, line 23
def shutdown
  # Give runner time to finish its work
  self.shutdown_runner = true
  sleep 10

  # Spawn new thread for closing the connection. Connection cannot be closed
  # from current thread (being in TRAP context).
  Thread.new { self.connection.try :close }

  # Sleep again to wait for connection close
  sleep 10
end
shutdown?() click to toggle source
# File lib/roundhousekiq/runner.rb, line 36
def shutdown?
  self.shutdown_runner
end