class SimplePubSub::Broker

Public Class Methods

start(host: '0.0.0.0', port: 59000, brokers: [], debug: false) click to toggle source
# File lib/simplepubsub.rb, line 17
def self.start(host: '0.0.0.0', port: 59000, brokers: [], debug: false)

        
  EM.run do

    subscribers = {}

    WebSocket::EventMachine::Server.start(host: host, port: port) do |ws|

      ws.onopen do
        #puts "Client connected"
      end

      ws.onmessage do |msg, type|
        
        puts 'msg: ' + msg.inspect if debug
        next if msg.empty?
        
        msg = '' if not msg[0][/[\w:]/]
        a = msg.lstrip.split(/\s*:\s/,2)

        def ws.subscriber?() 
          false
        end
        
       
        if a.first == 'subscribe to topic' then

          topic = a.last.rstrip.gsub('+','*')\
                                          .gsub('#','*//').gsub(/\bor/,'|')
          subscribers[topic] ||= []
          subscribers[topic] << ws

          # affix the topic to the subscriber's websocket
          def ws.subscriber_topic=(topic)  @topic = topic     end
          def ws.subscriber_topic()  @topic                   end

          ws.subscriber_topic = topic

        elsif a.first == 'shutdown' then
          
          puts 'shutting down ...'
          EM.stop
          
        elsif a.length > 1 and a.first != ''

          current_topic, message = a
          
          # is the message from another SPS broker?
          
          if current_topic[0] == ':' then
            
            # strip off the broker ID
            current_topic.sub!(/:\w+\//,'')
            
          elsif brokers.any?                                
            
            brokers.each do |broker|
            
              hostx, portx = broker.split(':',2)
              portx ||= port

              
              #puts 'address: ' + address.inspect
              fqm = ":%s/%s: %s" % [Socket.gethostname, current_topic, message]

              begin
                SPSPub.notice fqm, host: hostx, port: portx
              rescue
                puts "warning couldn\'t send to %s:%s" % [hostx, portx]
              end
              #sleep 0.5
            end
          end
          
          if not current_topic[0] == '/' and \
                            not current_topic =~ /[^a-zA-Z0-9\/_ ]$/ then                
            begin
              
              reg = XMLRegistry.new
              reg[current_topic] = message
            
            rescue
              puts 'simplepubsub.rb warning: ' + ($!).inspect
            end                

            subscribers.each do |topic,conns|
              
              xpath = topic.split('/')\
                  .map {|x| x.to_i.to_s == x ? x.prepend('x') : x}\
                  .join('/')

              node = reg.doc.root.xpath xpath.sub(/\S\b$/,'\0/text()')

              if node.any? then                  
                conns.each {|x| x.send current_topic + ': ' + message}
              end

            end
            
            reg = nil
            
          end

        end

      end

      ws.onclose do

        if ws.respond_to? :subscriber_topic then
          subscribers[ws.subscriber_topic].delete ws 
        end
      end
    end

  end
end