class Myxi::Session
Attributes
auth_object[RW]
attr_accessor :queue
id[R]
tag[RW]
Public Class Methods
new(event_loop, client_socket)
click to toggle source
Calls superclass method
Myxi::EventableSocket::new
# File lib/myxi/session.rb, line 9 def initialize(event_loop, client_socket) @id = SecureRandom.hex(8) @closure_callbacks = [] @data = {} @handshake = WebSocket::Handshake::Server.new @state = :handshake super @event_loop.sessions << self end
Public Instance Methods
[](name)
click to toggle source
# File lib/myxi/session.rb, line 83 def [](name) @data[name.to_sym] end
[]=(name, value)
click to toggle source
# File lib/myxi/session.rb, line 87 def []=(name, value) Myxi.logger.debug "[#{id}] Stored '#{name}' with '#{value}'" @data[name.to_sym] = value end
close()
click to toggle source
Called when the connection for this session is closed
Calls superclass method
Myxi::EventableSocket#close
# File lib/myxi/session.rb, line 183 def close Myxi.logger.debug "[#{id}] Session closed" @event_loop.sessions.delete(self) @queue.delete if @queue while callback = @closure_callbacks.shift callback.call end super end
handle_r()
click to toggle source
# File lib/myxi/session.rb, line 46 def handle_r case @state when :handshake @handshake << @socket.readpartial(1048576) if @handshake.finished? write(@handshake.to_s) if @handshake.valid? on_connect @state = :established @frame_handler = WebSocket::Frame::Incoming::Server.new(version: @handshake.version) else close_after_write end end when :established @frame_handler << @socket.readpartial(1048576) while frame = @frame_handler.next msg = frame.data json = JSON.parse(msg) rescue nil if json.is_a?(Hash) tag = json['tag'] || nil payload = json['payload'] || {} Myxi.logger.debug "[#{id}] \e[43;37mACTION\e[0m \e[33m#{json}\e[0m" if action = Myxi::Action::ACTIONS[json['action'].to_s.to_sym] action.execute(self, payload) else send_text_data({:event => 'Error', :tag => tag, :payload => {:error => 'InvalidAction'}}.to_json) end else send_text_data({:event => 'Error', :payload => {:error => 'InvalidJSON'}}.to_json) end end end rescue EOFError, Errno::ECONNRESET, IOError close end
on_close(&block)
click to toggle source
Adds a callback to be executed when this session closes
# File lib/myxi/session.rb, line 196 def on_close(&block) @closure_callbacks << block end
on_connect()
click to toggle source
# File lib/myxi/session.rb, line 26 def on_connect Myxi.logger.debug "[#{id}] Connection opened" send_text_data({:event => 'Welcome', :payload => {:id => id}}.to_json) begin @queue = Myxi.channel.queue("", :exclusive => true) rescue NoMethodError # This exception may be raised when something goes very wrong with the RabbitMQ connection # Unfortunately the only practical solution is to restart the client Process.exit(1) end @queue.subscribe do |delivery_info, properties, body| if hash = JSON.parse(body) rescue nil hash['mq'] = {'e' => delivery_info.exchange, 'rk' => delivery_info.routing_key} payload = hash.to_json.force_encoding('UTF-8') Myxi.logger.debug "[#{id}] \e[45;37mEVENT\e[0m \e[35m#{payload}\e[0m (to #{delivery_info.exchange}/#{delivery_info.routing_key})" send_text_data(payload) end end end
send(name, payload = {})
click to toggle source
Send an event back to the client on this session
# File lib/myxi/session.rb, line 102 def send(name, payload = {}) payload = {:event => name, :tag => tag, :payload => payload}.to_json.force_encoding('UTF-8') send_text_data(payload) Myxi.logger.debug "[#{id}] \e[46;37mMESSAGE\e[0m \e[36m#{payload}\e[0m" end
send_text_data(data)
click to toggle source
# File lib/myxi/session.rb, line 200 def send_text_data(data) sender = WebSocket::Frame::Outgoing::Server.new(version: @handshake.version, data: data, type: :text) write(sender.to_s) end
subscribe(exchange_name, routing_key)
click to toggle source
Subscribe this session to receive items for the given exchange & routing key
# File lib/myxi/session.rb, line 111 def subscribe(exchange_name, routing_key) if exchange = Myxi::Exchange::EXCHANGES[exchange_name.to_sym] if exchange.can_subscribe?(routing_key, self.auth_object) subscriptions[exchange_name.to_s] ||= [] if subscriptions[exchange_name.to_s].include?(routing_key.to_s) send('Error', :error => 'AlreadySubscribed', :exchange => exchange_name, :routing_key => routing_key) else @queue.bind(exchange.exchange_name.to_s, :routing_key => routing_key.to_s) subscriptions[exchange_name.to_s] << routing_key.to_s Myxi.logger.debug "[#{id}] \e[42;37mSUBSCRIBED\e[0m \e[32m#{exchange_name} / #{routing_key}\e[0m" send('Subscribed', :exchange => exchange_name, :routing_key => routing_key) end else send('Error', :error => 'SubscriptionDenied', :exchange => exchange_name, :routing_key => routing_key) end else send('Error', :error => 'InvalidExchange', :exchange => exchange_name) end end
subscriptions()
click to toggle source
Keep track of all subscriptions
# File lib/myxi/session.rb, line 95 def subscriptions @subscriptions ||= {} end
touch()
click to toggle source
Called by the server every so often whenever this session is active. This should verify that subscriptions are still valid etc…
# File lib/myxi/session.rb, line 167 def touch subscriptions.each do |exchange_name, routing_keys| if exchange = Myxi::Exchange::EXCHANGES[exchange_name.to_sym] routing_keys.each do |routing_key| unless exchange.can_subscribe?(routing_key, self.auth_object) Myxi.logger.info "[#{id}] Session is not longer allowed to subscibe to #{exchange_name}/#{routing_key}" unsubscribe(exchange_name, routing_key, true) end end end end end
unsubscribe(exchange_name, routing_key, auto = false)
click to toggle source
Unsubscribe this session from the given exchange name and routing key
# File lib/myxi/session.rb, line 134 def unsubscribe(exchange_name, routing_key, auto = false) @queue.unbind(exchange_name.to_s, :routing_key => routing_key.to_s) if subscriptions[exchange_name.to_s] subscriptions[exchange_name.to_s].delete(routing_key.to_s) end Myxi.logger.debug "[#{id}] \e[42;37mUNSUBSCRIBED\e[0m \e[32m#{exchange_name} / #{routing_key}\e[0m" send('Unsubscribed', :exchange_name => exchange_name, :routing_key => routing_key, :auto => auto) end
unsubscribe_all()
click to toggle source
Unsubscribe all
# File lib/myxi/session.rb, line 157 def unsubscribe_all self.subscriptions.keys.each do |exchange_name| self.unsubscribe_all_for_exchange(exchange_name) end end
unsubscribe_all_for_exchange(exchange_name)
click to toggle source
Unscubribe all for an exchange
# File lib/myxi/session.rb, line 146 def unsubscribe_all_for_exchange(exchange_name) if array = self.subscriptions[exchange_name.to_s] array.dup.each do |routing_key| self.unsubscribe(exchange_name.to_s, routing_key) end end end