class PushyDaemon::Proxy

Attributes

table[RW]

Class options

Public Class Methods

new() click to toggle source
# File lib/pushyd/proxy.rb, line 17
def initialize
  # Init
  @shouter = nil
  @consumers = []

  # Init ASCII table
  @table = Terminal::Table.new
  @table.title = "Rules summary"
  @table.headings = ["rule", "topic", "> queue", "> relay", "routing key", "bind status"]
  @table.align_column(5, :right)

  # Prepare logger
  @logger = BmcDaemonLib::LoggerPool.instance.get

  # Start connexion to RabbitMQ
  connect_to BmcDaemonLib::Conf[:broker]

  # Create a new shouter
  create_shouter

  # Check config and subscribe rules
  create_consumers

  # Send config table to logs
  log_info "Proxy initialized", @table.to_s
  puts @table.to_s

  # Make the shouter loop!
  @shouter.start_loop

  rescue BmcDaemonLib::MqConsumerException => e
    log_error "Proxy exception: #{e.message}"
    abort "EXITING #{e.class}: #{e.message}"

  rescue ShouterInterrupted, ProxyConnectionError, Errno::EACCES => e
    log_error "Proxy error: #{e.message}"
    abort "EXITING #{e.class}: #{e.message}"

  rescue StandardError => e
    log_error "Proxy unexpected: #{e.message}", e.backtrace
    abort "EXITING #{e.class}: #{e.message} \n #{e.backtrace.to_yaml}"
    # raise MqConsumerException, e.message

end

Protected Instance Methods

connect_to(busconf) click to toggle source

Start connexion to RabbitMQ

# File lib/pushyd/proxy.rb, line 147
def connect_to busconf
  fail ProxyConnectionError, "connect_to/busconf" unless busconf
  log_info "connect_to: connecting to broker", {
    broker: busconf,
    recover: AMQP_RECOVERY_INTERVAL,
    heartbeat: AMQP_HEARTBEAT_INTERVAL,
    prefetch: AMQP_PREFETCH
    }
  @conn = Bunny.new busconf.to_s,
    logger: @logger,
    # heartbeat: :server,
    automatically_recover: true,
    network_recovery_interval: AMQP_RECOVERY_INTERVAL,
    heartbeat_interval: AMQP_HEARTBEAT_INTERVAL,
    read_write_timeout: AMQP_HEARTBEAT_INTERVAL*2

  # Start the connection
  @conn.start

rescue Bunny::TCPConnectionFailedForAllHosts, Bunny::AuthenticationFailureError, AMQ::Protocol::EmptyResponseError  => e
  fail ProxyConnectionError, "error connecting (#{e.class})"
rescue StandardError => e
  fail ProxyConnectionError, "unknow (#{e.inspect})"
else
  #return conn
end
consumer_cancelled(all={}) click to toggle source
# File lib/pushyd/proxy.rb, line 142
def consumer_cancelled all={}
  log_error "consumer_cancelled remotely: #{all.inspect}"
end
create_consumer(rule) click to toggle source

Subscribe to interesting topic/key and bind a listenner

# File lib/pushyd/proxy.rb, line 95
def create_consumer rule
  # Check information
  rule_name   = rule[:name].to_s
  rule_topic  = rule[:topic].to_s
  rule_queue  = sprintf('%s-%s', BmcDaemonLib::Conf.app_name, rule_name.gsub('_', '-'))

  # Extract routing keys
  if rule[:keys].is_a? Array
    rule_keys = rule[:keys].map(&:to_s)
  else
    rule_keys = rule[:keys].to_s.split(',').map(&:strip)
  end

  # Check we have a topic and at least one routing key
  fail BmcDaemonLib::MqConsumerError, "rule [#{rule_name}]: missing topic" unless rule_topic
  fail BmcDaemonLib::MqConsumerError, "rule [#{rule_name}]: missing keys" if rule_keys.empty?

  # Build a new consumer on its own channel
  channel = @conn.create_channel
  consumer = Consumer.new(channel, rule_name, rule)

  # Subscribe to its own queue
  consumer.subscribe_to_queue rule_queue, "rule:#{rule_name}"

  # Bind each key to exchange
  rule_keys.each do |key|
    begin
      q = consumer.listen_to rule_topic, key
    rescue BmcDaemonLib::MqConsumerTopicNotFound,
           BmcDaemonLib::MqConsumerException => e
      status = "FAILED: #{e.class} \n#{e.message}"
      log_error "create_consumer [#{e.class}] #{e.message}"
    rescue StandardError => e
      status = "FAILED: #{e.message}"
      log_error "create_consumer: #{e.message}"
    else
      status = "QUEUE #{q.object_id}"
    end

    # Add row to config table
    @table.add_row [rule_name, rule_topic, rule_queue, rule[:relay].to_s, key, status ]
  end

  # Return consumer
  consumer
end
create_consumers() click to toggle source
# File lib/pushyd/proxy.rb, line 78
def create_consumers
  # Get config
  config_rules = BmcDaemonLib::Conf[:rules]
  if config_rules.nil? || !config_rules.is_a?(Hash)
    log_error "create_consumers: no rules"
    return
  end
  log_info "create_consumers: #{config_rules.keys.join(', ')}"

  # Subscribe for each and every rule/key
  config_rules.each do |name, rule|
    rule[:name] = name
    @consumers << create_consumer(rule)
  end
end
create_shouter() click to toggle source
# File lib/pushyd/proxy.rb, line 70
def create_shouter
  # Create my channel
  channel = @conn.create_channel

  # Create the shouter
  @shouter = Shouter.new(channel, BmcDaemonLib::Conf[:shout])
end
log_context() click to toggle source
# File lib/pushyd/proxy.rb, line 64
def log_context
  {
    me: :proxy
  }
end