class Sinapse::Server

Public Instance Methods

keep_alive() click to toggle source
# File lib/sinapse/server.rb, line 15
def keep_alive
  @keep_alive ||= KeepAlive.new
end
on_close(env) click to toggle source
# File lib/sinapse/server.rb, line 19
def on_close(env)
  close_redis(env['redis']) if env['redis']
  keep_alive.delete(env)
end
response(env) click to toggle source
# File lib/sinapse/server.rb, line 24
def response(env)
  env['redis'] = Redis.new(:driver => :synchrony, :url => Sinapse.config[:url])

  user, channels = authenticate(env)
  return [401, {}, []] if user.nil? || channels.empty?

  EM.next_tick do
    sse(env, :ok, :authentication, retry: Config.retry)
    subscribe(env, user, channels)
    keep_alive << env
  end

  chunked_streaming_response(200, response_headers(env))
end

Private Instance Methods

authenticate(env) click to toggle source
# File lib/sinapse/server.rb, line 41
def authenticate(env)
  user = env['redis'].get("sinapse:tokens:#{params['access_token']}")
  if user
    channels = env['redis'].smembers("sinapse:channels:#{user}")
    [user, channels]
  end
end
close_redis(redis) click to toggle source
# File lib/sinapse/server.rb, line 93
def close_redis(redis)
  if redis.subscribed?
    redis.unsubscribe
  else
    redis.quit
  end
end
response_headers(env) click to toggle source
# File lib/sinapse/server.rb, line 101
def response_headers(env)
  headers = {
    'Connection' => 'close',
    'Content-Type' => 'text/event-stream'
  }
  if env['cors.headers']
    headers.merge(env['cors.headers'])
  else
    headers
  end
end
sse(env, data, event = nil, options = {}) click to toggle source
# File lib/sinapse/server.rb, line 84
def sse(env, data, event = nil, options = {})
  message = []
  message << "retry: %d" % options[:retry] if options[:retry]
  message << "id: %d" % options[:id] if options[:id]
  message << "event: %s" % event if event
  message << "data: %s" % data.to_s.gsub(/\n/, "\ndata: ")
  env.chunked_stream_send message.join("\n") + "\n\n"
end
subscribe(env, user, channels) click to toggle source
# File lib/sinapse/server.rb, line 49
def subscribe(env, user, channels)
  EM.synchrony do
    env['redis'].psubscribe("sinapse:channels:#{user}:*") do |on|
      on.psubscribe do
        env['redis'].subscribe(*channels)
      end

      on.pmessage do |_, channel, message|
        update_subscriptions(env, message, channel)
      end

      on.message do |channel, data|
        event, message = unpack(channel, data)
        sse(env, message, event)
      end
    end
    env['redis'].quit
  end
end
unpack(channel, data) click to toggle source
# File lib/sinapse/server.rb, line 74
def unpack(channel, data)
  message = MessagePack.unpack(data)
  if message.is_a?(Array)
    message
  else
    event = Config.channel_event ? channel : nil
    [event, message]
  end
end
update_subscriptions(env, message, channel) click to toggle source
# File lib/sinapse/server.rb, line 69
def update_subscriptions(env, message, channel)
  return env['redis'].subscribe(message)   if channel.end_with?(':add')
  return env['redis'].unsubscribe(message) if channel.end_with?(':remove')
end