class HonestPubsub::Server::SubscriberServer

Public Class Methods

new(subscribers) click to toggle source
# File lib/honest_pubsub/server/subscriber_server.rb, line 20
def initialize(subscribers)
  @workers = subscribers.map { |subscriber| create_queue_listeners(subscriber) }
end

Public Instance Methods

start() click to toggle source
# File lib/honest_pubsub/server/subscriber_server.rb, line 24
def start
  @workers.each do |worker|
    puts "Starting worker: #{worker.worker_class.name}"
    worker.start
  end

  thread     = Thread.current
  interrupts = ["HUP", "INT", "QUIT", "ABRT", "TERM"]
  interrupts.each do |signal_name|
    Signal.trap(signal_name) {
      puts "Processing #{signal_name}"
      thread.run
    }
  end

  Thread.stop

  ::HonestPubsub::Logger.new.log_service("all_services", :warn, "Starting shutdown of all services")

  @workers.each do |worker|
    ::HonestPubsub::Logger.new.log_service("all_services", :warn, "Tearing down worker: #{worker.worker_class.name}")
    begin
      STDOUT.puts "Tearing down subscriber for #{worker.worker_class.name}"
      worker.shutdown
    rescue => e
      ::HonestPubsub::Logger.new.log_service("all_services", :warn, "#{worker.worker_class.name} - did not tear down correctly.  Error - #{e.message}")
    end
  end
ensure
  HonestPubsub::CLI.instance.remove_pid
end

Private Instance Methods

create_queue_listeners(subscriber) click to toggle source
# File lib/honest_pubsub/server/subscriber_server.rb, line 66
def create_queue_listeners(subscriber)
  routing_key           = subscriber.subscribed_key
  subscribed_queue_name = subscriber.subscribed_queue

  if routing_key.blank?
    raise ArgumentError.new("Routing key must be provided in #{subscriber.name} using `subscribe_to routing_key`")
  end

  if subscribed_queue_name.blank?
    raise ArgumentError.new("Queue Name must be provided in #{subscriber.name} using `subscribe_to routing_key, on: queue_name`")
  end

  STDOUT.puts "Setting up listener for request_key: #{routing_key} and queue:#{subscribed_queue_name}"
  ClientQueueListener.new(subscriber, routing_key, subscribed_queue_name)
end
warn(message) click to toggle source
# File lib/honest_pubsub/server/subscriber_server.rb, line 58
def warn(message)
  old_behavior = ActiveSupport::Deprecation.behavior
  ActiveSupport::Deprecation.behavior = [:stderr, :log]
  ActiveSupport::Deprecation.warn(message)
ensure
  ActiveSupport::Deprecation.behavior = old_behavior
end