class PushyDaemon::Proxy
Attributes
table[RW]
Class options
Public Class Methods
new()
click to toggle source
# File lib/pushyd/proxy.rb, line 17 def initialize # Init @shouter = nil @consumers = [] # Init ASCII table @table = Terminal::Table.new @table.title = "Rules summary" @table.headings = ["rule", "topic", "> queue", "> relay", "routing key", "bind status"] @table.align_column(5, :right) # Prepare logger @logger = BmcDaemonLib::LoggerPool.instance.get # Start connexion to RabbitMQ connect_to BmcDaemonLib::Conf[:broker] # Create a new shouter create_shouter # Check config and subscribe rules create_consumers # Send config table to logs log_info "Proxy initialized", @table.to_s puts @table.to_s # Make the shouter loop! @shouter.start_loop rescue BmcDaemonLib::MqConsumerException => e log_error "Proxy exception: #{e.message}" abort "EXITING #{e.class}: #{e.message}" rescue ShouterInterrupted, ProxyConnectionError, Errno::EACCES => e log_error "Proxy error: #{e.message}" abort "EXITING #{e.class}: #{e.message}" rescue StandardError => e log_error "Proxy unexpected: #{e.message}", e.backtrace abort "EXITING #{e.class}: #{e.message} \n #{e.backtrace.to_yaml}" # raise MqConsumerException, e.message end
Protected Instance Methods
connect_to(busconf)
click to toggle source
Start connexion to RabbitMQ
# File lib/pushyd/proxy.rb, line 147 def connect_to busconf fail ProxyConnectionError, "connect_to/busconf" unless busconf log_info "connect_to: connecting to broker", { broker: busconf, recover: AMQP_RECOVERY_INTERVAL, heartbeat: AMQP_HEARTBEAT_INTERVAL, prefetch: AMQP_PREFETCH } @conn = Bunny.new busconf.to_s, logger: @logger, # heartbeat: :server, automatically_recover: true, network_recovery_interval: AMQP_RECOVERY_INTERVAL, heartbeat_interval: AMQP_HEARTBEAT_INTERVAL, read_write_timeout: AMQP_HEARTBEAT_INTERVAL*2 # Start the connection @conn.start rescue Bunny::TCPConnectionFailedForAllHosts, Bunny::AuthenticationFailureError, AMQ::Protocol::EmptyResponseError => e fail ProxyConnectionError, "error connecting (#{e.class})" rescue StandardError => e fail ProxyConnectionError, "unknow (#{e.inspect})" else #return conn end
consumer_cancelled(all={})
click to toggle source
# File lib/pushyd/proxy.rb, line 142 def consumer_cancelled all={} log_error "consumer_cancelled remotely: #{all.inspect}" end
create_consumer(rule)
click to toggle source
Subscribe to interesting topic/key and bind a listenner
# File lib/pushyd/proxy.rb, line 95 def create_consumer rule # Check information rule_name = rule[:name].to_s rule_topic = rule[:topic].to_s rule_queue = sprintf('%s-%s', BmcDaemonLib::Conf.app_name, rule_name.gsub('_', '-')) # Extract routing keys if rule[:keys].is_a? Array rule_keys = rule[:keys].map(&:to_s) else rule_keys = rule[:keys].to_s.split(',').map(&:strip) end # Check we have a topic and at least one routing key fail BmcDaemonLib::MqConsumerError, "rule [#{rule_name}]: missing topic" unless rule_topic fail BmcDaemonLib::MqConsumerError, "rule [#{rule_name}]: missing keys" if rule_keys.empty? # Build a new consumer on its own channel channel = @conn.create_channel consumer = Consumer.new(channel, rule_name, rule) # Subscribe to its own queue consumer.subscribe_to_queue rule_queue, "rule:#{rule_name}" # Bind each key to exchange rule_keys.each do |key| begin q = consumer.listen_to rule_topic, key rescue BmcDaemonLib::MqConsumerTopicNotFound, BmcDaemonLib::MqConsumerException => e status = "FAILED: #{e.class} \n#{e.message}" log_error "create_consumer [#{e.class}] #{e.message}" rescue StandardError => e status = "FAILED: #{e.message}" log_error "create_consumer: #{e.message}" else status = "QUEUE #{q.object_id}" end # Add row to config table @table.add_row [rule_name, rule_topic, rule_queue, rule[:relay].to_s, key, status ] end # Return consumer consumer end
create_consumers()
click to toggle source
# File lib/pushyd/proxy.rb, line 78 def create_consumers # Get config config_rules = BmcDaemonLib::Conf[:rules] if config_rules.nil? || !config_rules.is_a?(Hash) log_error "create_consumers: no rules" return end log_info "create_consumers: #{config_rules.keys.join(', ')}" # Subscribe for each and every rule/key config_rules.each do |name, rule| rule[:name] = name @consumers << create_consumer(rule) end end
create_shouter()
click to toggle source
# File lib/pushyd/proxy.rb, line 70 def create_shouter # Create my channel channel = @conn.create_channel # Create the shouter @shouter = Shouter.new(channel, BmcDaemonLib::Conf[:shout]) end
log_context()
click to toggle source
# File lib/pushyd/proxy.rb, line 64 def log_context { me: :proxy } end