class Toro::Listener
Public Class Methods
new(options={})
click to toggle source
# File lib/toro/listener.rb, line 5 def initialize(options={}) defaults = { queues: [Toro.options[:default_queue]] } options.reverse_merge!(defaults) @queues = options[:queues] @fetcher = options[:fetcher] @manager = options[:manager] @is_done = false raise 'No fetcher provided' if @fetcher.blank? raise 'No manager provided' if @manager.blank? @manager.register_actor(:listener, self) end
Public Instance Methods
start()
click to toggle source
# File lib/toro/listener.rb, line 19 def start Toro::Database.with_connection do Toro::Database.raw_connection.async_exec(channels.map { |channel| "LISTEN #{channel}" }.join('; ')) wait_for_notify end end
stop()
click to toggle source
# File lib/toro/listener.rb, line 26 def stop Toro::Database.raw_connection.async_exec(channels.map { |channel| "UNLISTEN #{channel}" }.join('; ')) @is_done = true end
Protected Instance Methods
channels()
click to toggle source
# File lib/toro/listener.rb, line 42 def channels @queues.map { |queue| "toro_#{queue}" } end
wait_for_notify()
click to toggle source
# File lib/toro/listener.rb, line 33 def wait_for_notify loop do Toro::Database.raw_connection.wait_for_notify(Toro.options[:listen_interval]) do |channel, pid, payload| @fetcher.notify end break if @is_done end end