class Roundhousekiq::Runner
Attributes
channel[RW]
connection[RW]
consumer[RW]
error_exchange[RW]
exchange[RW]
queue[RW]
queue_worker_map[RW]
queues[RW]
shutdown_runner[RW]
Public Class Methods
client_settings()
click to toggle source
# File lib/roundhousekiq/runner.rb, line 57 def self.client_settings Bunny::Session::DEFAULT_CLIENT_PROPERTIES.merge product: 'Roundhousekiq' end
connection_settings()
click to toggle source
# File lib/roundhousekiq/runner.rb, line 52 def self.connection_settings config = Roundhousekiq.config.to_h config.select { |k, v| %i(host port vhost username password).include? k } end
new()
click to toggle source
Public API
# File lib/roundhousekiq/runner.rb, line 11 def initialize self.queues = [] self.queue_worker_map = {} end
Public Instance Methods
create_channel()
click to toggle source
# File lib/roundhousekiq/runner.rb, line 61 def create_channel self.channel = self.connection.create_channel self.channel.prefetch Roundhousekiq.config.prefetch self.channel end
create_exchanges_and_queues()
click to toggle source
# File lib/roundhousekiq/runner.rb, line 67 def create_exchanges_and_queues Workers.definitions.each do |worker, definition| exchange = self.channel.exchange( definition.exchange[:name], type: definition.exchange[:type], durable: true ) queue = self.channel.queue( definition.queue[:name], auto_delete: definition.queue[:auto_delete], durable: definition.queue[:durable] ).bind(exchange, routing_key: definition.queue[:routing_key]) self.queues << queue self.queue_worker_map[queue] = worker end end
establish_connection()
click to toggle source
Connection
# File lib/roundhousekiq/runner.rb, line 45 def establish_connection options = { properties: self.class.client_settings } self.connection = Bunny.new self.class.connection_settings, options self.connection.start end
process_message(queue, payload)
click to toggle source
# File lib/roundhousekiq/runner.rb, line 99 def process_message(queue, payload) queue_worker_map[queue].perform_async JSON.parse(payload) end
run()
click to toggle source
# File lib/roundhousekiq/runner.rb, line 16 def run establish_connection create_channel create_exchanges_and_queues setup_subscribers end
setup_subscribers()
click to toggle source
# File lib/roundhousekiq/runner.rb, line 86 def setup_subscribers self.queues.each do |queue| queue.subscribe(manual_ack: true) do |delivery_info, metadata, payload| self.channel.ack delivery_info.delivery_tag process_message queue, payload end end while not self.shutdown? sleep 5 end end
shutdown()
click to toggle source
# File lib/roundhousekiq/runner.rb, line 23 def shutdown # Give runner time to finish its work self.shutdown_runner = true sleep 10 # Spawn new thread for closing the connection. Connection cannot be closed # from current thread (being in TRAP context). Thread.new { self.connection.try :close } # Sleep again to wait for connection close sleep 10 end
shutdown?()
click to toggle source
# File lib/roundhousekiq/runner.rb, line 36 def shutdown? self.shutdown_runner end