class FastlyNsq::Manager
Interface for tracking listeners and managing the processing pool.
Constants
- DEADLINE
Attributes
@return [Boolean] Set true when all listeners are stopped
@return [Logger]
@return [FastlyNsq::PriorityThreadPool]
Public Class Methods
Create a FastlyNsq::Manager
@param logger [Logger] @param max_threads [Integer] Maxiumum number of threads to be used by {FastlyNsq::PriorityThreadPool} @param pool_options [Hash] Options forwarded to {FastlyNsq::PriorityThreadPool} constructor.
# File lib/fastly_nsq/manager.rb, line 23 def initialize(logger: FastlyNsq.logger, max_threads: FastlyNsq.max_processing_pool_threads, **pool_options) @done = false @logger = logger @pool = FastlyNsq::PriorityThreadPool.new( { fallback_policy: :caller_runs, max_threads: max_threads }.merge(pool_options), ) end
Public Instance Methods
Add a {FastlyNsq::Listener} to the @topic_listeners @param listener [FastlyNsq::Listener}
# File lib/fastly_nsq/manager.rb, line 78 def add_listener(listener) logger.info { "topic #{listener.topic}, channel #{listener.channel}: listening" } if topic_listeners[listener.topic] logger.warn { "topic #{listener.topic}: duplicate listener" } end topic_listeners[listener.topic] = listener end
Set of {FastlyNsq::Listener} objects @return [Set]
# File lib/fastly_nsq/manager.rb, line 48 def listeners topic_listeners.values.to_set end
Terminate all listeners
# File lib/fastly_nsq/manager.rb, line 101 def stop_listeners logger.info { 'Stopping listeners' } listeners.each(&:terminate) topic_listeners.clear end
Manager
state @return [Boolean]
# File lib/fastly_nsq/manager.rb, line 71 def stopped? done end
Stop the manager. Terminates the listeners and stops all processing in the pool. @param deadline [Integer] Number of seconds to wait for pool to stop processing
# File lib/fastly_nsq/manager.rb, line 56 def terminate(deadline = DEADLINE) return if done stop_listeners return if pool.shutdown? stop_processing(deadline) @done = true end
Hash of listeners. Keys are topics, values are {FastlyNsq::Listener} instances. @return [Hash]
# File lib/fastly_nsq/manager.rb, line 34 def topic_listeners @topic_listeners ||= {} end
Array of listening topic names @return [Array]
# File lib/fastly_nsq/manager.rb, line 41 def topics topic_listeners.keys end
Transer listeners to a new manager and stop processing from the existing pool. @param new_manager [FastlyNsq::Manager] new manager to which listeners will be added @param deadline [Integer] Number of seconds to wait for exsiting pool to stop processing
# File lib/fastly_nsq/manager.rb, line 92 def transfer(new_manager, deadline: DEADLINE) new_manager.topic_listeners.merge!(topic_listeners) stop_processing(deadline) topic_listeners.clear @done = true end
Protected Instance Methods
Shutdown the pool @param deadline [Integer] Number of seconds to wait for pool to stop processing
# File lib/fastly_nsq/manager.rb, line 112 def stop_processing(deadline) logger.info { 'Stopping processors' } pool.shutdown logger.info { 'Waiting for processors to finish...' } return if pool.wait_for_termination(deadline) logger.info { 'Killing processors...' } pool.kill end