class FastlyNsq::Manager

Interface for tracking listeners and managing the processing pool.

Constants

DEADLINE

Attributes

done[R]

@return [Boolean] Set true when all listeners are stopped

logger[R]

@return [Logger]

pool[R]

@return [FastlyNsq::PriorityThreadPool]

Public Class Methods

new(logger: FastlyNsq.logger, max_threads: FastlyNsq.max_processing_pool_threads, **pool_options) click to toggle source

Create a FastlyNsq::Manager

@param logger [Logger] @param max_threads [Integer] Maxiumum number of threads to be used by {FastlyNsq::PriorityThreadPool} @param pool_options [Hash] Options forwarded to {FastlyNsq::PriorityThreadPool} constructor.

# File lib/fastly_nsq/manager.rb, line 23
def initialize(logger: FastlyNsq.logger, max_threads: FastlyNsq.max_processing_pool_threads, **pool_options)
  @done      = false
  @logger    = logger
  @pool      = FastlyNsq::PriorityThreadPool.new(
    { fallback_policy: :caller_runs, max_threads: max_threads }.merge(pool_options),
  )
end

Public Instance Methods

add_listener(listener) click to toggle source

Add a {FastlyNsq::Listener} to the @topic_listeners @param listener [FastlyNsq::Listener}

# File lib/fastly_nsq/manager.rb, line 78
def add_listener(listener)
  logger.info { "topic #{listener.topic}, channel #{listener.channel}: listening" }

  if topic_listeners[listener.topic]
    logger.warn { "topic #{listener.topic}: duplicate listener" }
  end

  topic_listeners[listener.topic] = listener
end
listeners() click to toggle source

Set of {FastlyNsq::Listener} objects @return [Set]

# File lib/fastly_nsq/manager.rb, line 48
def listeners
  topic_listeners.values.to_set
end
stop_listeners() click to toggle source

Terminate all listeners

# File lib/fastly_nsq/manager.rb, line 101
def stop_listeners
  logger.info { 'Stopping listeners' }
  listeners.each(&:terminate)
  topic_listeners.clear
end
stopped?() click to toggle source

Manager state @return [Boolean]

# File lib/fastly_nsq/manager.rb, line 71
def stopped?
  done
end
terminate(deadline = DEADLINE) click to toggle source

Stop the manager. Terminates the listeners and stops all processing in the pool. @param deadline [Integer] Number of seconds to wait for pool to stop processing

# File lib/fastly_nsq/manager.rb, line 56
def terminate(deadline = DEADLINE)
  return if done

  stop_listeners

  return if pool.shutdown?

  stop_processing(deadline)

  @done = true
end
topic_listeners() click to toggle source

Hash of listeners. Keys are topics, values are {FastlyNsq::Listener} instances. @return [Hash]

# File lib/fastly_nsq/manager.rb, line 34
def topic_listeners
  @topic_listeners ||= {}
end
topics() click to toggle source

Array of listening topic names @return [Array]

# File lib/fastly_nsq/manager.rb, line 41
def topics
  topic_listeners.keys
end
transfer(new_manager, deadline: DEADLINE) click to toggle source

Transer listeners to a new manager and stop processing from the existing pool. @param new_manager [FastlyNsq::Manager] new manager to which listeners will be added @param deadline [Integer] Number of seconds to wait for exsiting pool to stop processing

# File lib/fastly_nsq/manager.rb, line 92
def transfer(new_manager, deadline: DEADLINE)
  new_manager.topic_listeners.merge!(topic_listeners)
  stop_processing(deadline)
  topic_listeners.clear
  @done = true
end

Protected Instance Methods

stop_processing(deadline) click to toggle source

Shutdown the pool @param deadline [Integer] Number of seconds to wait for pool to stop processing

# File lib/fastly_nsq/manager.rb, line 112
def stop_processing(deadline)
  logger.info { 'Stopping processors' }
  pool.shutdown

  logger.info { 'Waiting for processors to finish...' }
  return if pool.wait_for_termination(deadline)

  logger.info { 'Killing processors...' }
  pool.kill
end