class Vines::Cluster::Sessions

Manages the cluster node list and user session routing table stored in redis. All cluster nodes share this in-memory database to quickly discover the node hosting a particular user session. Once a session is located, stanzas can be routed to that node via the Publisher.

Constants

NODES

Public Class Methods

new(cluster) click to toggle source
# File lib/vines/cluster/sessions.rb, line 14
def initialize(cluster)
  @cluster, @nodes = cluster, {}
end

Public Instance Methods

delete(jid) click to toggle source

Remove this user from the cluster routing table so that no further stanzas may be routed to them. This must be called when the user's session is terminated, either by logout or stream disconnect.

# File lib/vines/cluster/sessions.rb, line 43
def delete(jid)
  jid = JID.new(jid)
  redis.hget("sessions:#{jid.bare}", jid.resource) do |response|
    if doc = JSON.parse(response) rescue nil
      redis.multi
      redis.hdel("sessions:#{jid.bare}", jid.resource)
      redis.srem("cluster:nodes:#{doc['node']}", jid.to_s)
      redis.exec
    end
  end
end
delete_all(node) click to toggle source

Remove all user sessions from the routing table associated with the given node ID. Cluster nodes call this themselves during normal shutdown. However, if a node dies without being properly shutdown, the other nodes will cleanup its sessions when they detect the node is offline.

# File lib/vines/cluster/sessions.rb, line 59
def delete_all(node)
  @nodes.delete(node)
  redis.smembers("cluster:nodes:#{node}") do |jids|
    redis.multi
    redis.del("cluster:nodes:#{node}")
    redis.hdel(NODES, node)
    jids.each do |jid|
      jid = JID.new(jid)
      redis.hdel("sessions:#{jid.bare}", jid.resource)
    end
    redis.exec
  end
end
expire() click to toggle source

Cluster nodes broadcast a heartbeat to other members every second. If we haven't heard from a node in five seconds, assume it's offline and cleanup its session cache for it. Nodes may die abrubtly, without a chance to clear their sessions, so other members cleanup for them.

# File lib/vines/cluster/sessions.rb, line 77
def expire
  redis.hset(NODES, @cluster.id, Time.now.to_i)
  redis.hgetall(NODES) do |response|
    now = Time.now
    expired = Hash[*response].select do |node, active|
      offset = @nodes[node] || 0
      (now - offset) - Time.at(active.to_i) > 5
    end.keys
    expired.each {|node| delete_all(node) }
  end
end
find(*jids) click to toggle source

Return the sessions for these JIDs. If a bare JID is used, all sessions for that user will be returned. If a full JID is used, the session for that single connected stream is returned.

# File lib/vines/cluster/sessions.rb, line 21
def find(*jids)
  jids.flatten.map do |jid|
    jid = JID.new(jid)
    jid.bare? ? user_sessions(jid) : user_session(jid)
  end.compact.flatten
end
poke(node, time) click to toggle source

Notify the session store that this node is still alive. The node broadcasts its current time, so all cluster members' clocks don't necessarily need to be in sync.

# File lib/vines/cluster/sessions.rb, line 92
def poke(node, time)
  offset = Time.now.to_i - time
  @nodes[node] = offset
end
save(jid, attrs) click to toggle source

Persist the user's session to the shared redis cache so that other cluster nodes can locate the node hosting this user's connection and route messages to them.

# File lib/vines/cluster/sessions.rb, line 31
def save(jid, attrs)
  jid = JID.new(jid)
  session = {node: @cluster.id}.merge(attrs)
  redis.multi
  redis.hset("sessions:#{jid.bare}", jid.resource, session.to_json)
  redis.sadd("cluster:nodes:#{@cluster.id}", jid.to_s)
  redis.exec
end

Private Instance Methods

redis() click to toggle source
# File lib/vines/cluster/sessions.rb, line 120
def redis
  @cluster.connection
end
user_session(jid) click to toggle source

Return the remote session for this full JID or nil if not found.

# File lib/vines/cluster/sessions.rb, line 111
def user_session(jid)
  response = @cluster.query(:hget, "sessions:#{jid.bare}", jid.resource)
  return unless response
  session = JSON.parse(response) rescue nil
  return if session.nil? || session['node'] == @cluster.id
  session['jid'] = jid.to_s
  session
end
user_sessions(jid) click to toggle source

Return all remote sessions for this user's bare JID.

# File lib/vines/cluster/sessions.rb, line 100
def user_sessions(jid)
  response = @cluster.query(:hgetall, "sessions:#{jid.bare}") || []
  Hash[*response].map do |resource, json|
    if session = JSON.parse(json) rescue nil
      session['jid'] = JID.new(jid.node, jid.domain, resource).to_s
    end
    session
  end.compact.reject {|session| session['node'] == @cluster.id }
end