class Myxi::Session

Attributes

auth_object[RW]

attr_accessor :queue

id[R]
tag[RW]

Public Class Methods

new(event_loop, client_socket) click to toggle source
Calls superclass method Myxi::EventableSocket::new
# File lib/myxi/session.rb, line 9
def initialize(event_loop, client_socket)
  @id  = SecureRandom.hex(8)
  @closure_callbacks = []
  @data = {}

  @handshake = WebSocket::Handshake::Server.new
  @state = :handshake
  super
  @event_loop.sessions << self

end

Public Instance Methods

[](name) click to toggle source
# File lib/myxi/session.rb, line 83
def [](name)
  @data[name.to_sym]
end
[]=(name, value) click to toggle source
# File lib/myxi/session.rb, line 87
def []=(name, value)
  Myxi.logger.debug "[#{id}] Stored '#{name}' with '#{value}'"
  @data[name.to_sym] = value
end
close() click to toggle source

Called when the connection for this session is closed

Calls superclass method Myxi::EventableSocket#close
# File lib/myxi/session.rb, line 183
def close
  Myxi.logger.debug "[#{id}] Session closed"
  @event_loop.sessions.delete(self)
  @queue.delete if @queue
  while callback = @closure_callbacks.shift
    callback.call
  end
  super
end
handle_r() click to toggle source
# File lib/myxi/session.rb, line 46
def handle_r
  case @state
  when :handshake
    @handshake << @socket.readpartial(1048576)
    if @handshake.finished?
      write(@handshake.to_s)
      if @handshake.valid?
        on_connect
        @state = :established
        @frame_handler = WebSocket::Frame::Incoming::Server.new(version: @handshake.version)
      else
        close_after_write
      end
    end
  when :established
    @frame_handler << @socket.readpartial(1048576)
    while frame = @frame_handler.next
      msg = frame.data
      json = JSON.parse(msg) rescue nil
      if json.is_a?(Hash)
        tag = json['tag'] || nil
        payload = json['payload'] || {}
        Myxi.logger.debug "[#{id}] \e[43;37mACTION\e[0m \e[33m#{json}\e[0m"
        if action = Myxi::Action::ACTIONS[json['action'].to_s.to_sym]
          action.execute(self, payload)
        else
          send_text_data({:event => 'Error', :tag => tag, :payload => {:error => 'InvalidAction'}}.to_json)
        end
      else
        send_text_data({:event => 'Error', :payload => {:error => 'InvalidJSON'}}.to_json)
      end
    end
  end
rescue EOFError, Errno::ECONNRESET, IOError
  close
end
on_close(&block) click to toggle source

Adds a callback to be executed when this session closes

# File lib/myxi/session.rb, line 196
def on_close(&block)
  @closure_callbacks << block
end
on_connect() click to toggle source
# File lib/myxi/session.rb, line 26
def on_connect
  Myxi.logger.debug "[#{id}] Connection opened"
  send_text_data({:event => 'Welcome', :payload => {:id => id}}.to_json)
  begin
    @queue = Myxi.channel.queue("", :exclusive => true)
  rescue NoMethodError
    # This exception may be raised when something goes very wrong with the RabbitMQ connection
    # Unfortunately the only practical solution is to restart the client
    Process.exit(1)
  end
  @queue.subscribe do |delivery_info, properties, body|
    if hash = JSON.parse(body) rescue nil
      hash['mq'] = {'e' => delivery_info.exchange, 'rk' => delivery_info.routing_key}
      payload = hash.to_json.force_encoding('UTF-8')
      Myxi.logger.debug "[#{id}] \e[45;37mEVENT\e[0m \e[35m#{payload}\e[0m (to #{delivery_info.exchange}/#{delivery_info.routing_key})"
      send_text_data(payload)
    end
  end
end
send(name, payload = {}) click to toggle source

 Send an event back to the client on this session

# File lib/myxi/session.rb, line 102
def send(name, payload = {})
  payload = {:event => name, :tag => tag, :payload => payload}.to_json.force_encoding('UTF-8')
  send_text_data(payload)
  Myxi.logger.debug "[#{id}] \e[46;37mMESSAGE\e[0m \e[36m#{payload}\e[0m"
end
send_text_data(data) click to toggle source
# File lib/myxi/session.rb, line 200
def send_text_data(data)
  sender = WebSocket::Frame::Outgoing::Server.new(version: @handshake.version, data: data, type: :text)
  write(sender.to_s)
end
subscribe(exchange_name, routing_key) click to toggle source

Subscribe this session to receive items for the given exchange & routing key

# File lib/myxi/session.rb, line 111
def subscribe(exchange_name, routing_key)
  if exchange = Myxi::Exchange::EXCHANGES[exchange_name.to_sym]
    if exchange.can_subscribe?(routing_key, self.auth_object)
      subscriptions[exchange_name.to_s] ||= []
      if subscriptions[exchange_name.to_s].include?(routing_key.to_s)
        send('Error', :error => 'AlreadySubscribed', :exchange => exchange_name, :routing_key => routing_key)
      else
        @queue.bind(exchange.exchange_name.to_s, :routing_key => routing_key.to_s)
        subscriptions[exchange_name.to_s] << routing_key.to_s
        Myxi.logger.debug "[#{id}] \e[42;37mSUBSCRIBED\e[0m \e[32m#{exchange_name} / #{routing_key}\e[0m"
        send('Subscribed', :exchange => exchange_name, :routing_key => routing_key)
      end
    else
      send('Error', :error => 'SubscriptionDenied', :exchange => exchange_name, :routing_key => routing_key)
    end
  else
    send('Error', :error => 'InvalidExchange', :exchange => exchange_name)
  end
end
subscriptions() click to toggle source

 Keep track of all subscriptions

# File lib/myxi/session.rb, line 95
def subscriptions
  @subscriptions ||= {}
end
touch() click to toggle source

Called by the server every so often whenever this session is active. This should verify that subscriptions are still valid etc…

# File lib/myxi/session.rb, line 167
def touch
  subscriptions.each do |exchange_name, routing_keys|
    if exchange = Myxi::Exchange::EXCHANGES[exchange_name.to_sym]
      routing_keys.each do |routing_key|
        unless exchange.can_subscribe?(routing_key, self.auth_object)
          Myxi.logger.info "[#{id}] Session is not longer allowed to subscibe to #{exchange_name}/#{routing_key}"
          unsubscribe(exchange_name, routing_key, true)
        end
      end
    end
  end
end
unsubscribe(exchange_name, routing_key, auto = false) click to toggle source

Unsubscribe this session from the given exchange name and routing key

# File lib/myxi/session.rb, line 134
def unsubscribe(exchange_name, routing_key, auto = false)
  @queue.unbind(exchange_name.to_s, :routing_key => routing_key.to_s)
  if subscriptions[exchange_name.to_s]
    subscriptions[exchange_name.to_s].delete(routing_key.to_s)
  end
  Myxi.logger.debug "[#{id}] \e[42;37mUNSUBSCRIBED\e[0m \e[32m#{exchange_name} / #{routing_key}\e[0m"
  send('Unsubscribed', :exchange_name => exchange_name, :routing_key => routing_key, :auto => auto)
end
unsubscribe_all() click to toggle source

Unsubscribe all

# File lib/myxi/session.rb, line 157
def unsubscribe_all
  self.subscriptions.keys.each do |exchange_name|
    self.unsubscribe_all_for_exchange(exchange_name)
  end
end
unsubscribe_all_for_exchange(exchange_name) click to toggle source

Unscubribe all for an exchange

# File lib/myxi/session.rb, line 146
def unsubscribe_all_for_exchange(exchange_name)
  if array = self.subscriptions[exchange_name.to_s]
    array.dup.each do |routing_key|
      self.unsubscribe(exchange_name.to_s, routing_key)
    end
  end
end