class Sinapse::Server
Public Instance Methods
keep_alive()
click to toggle source
# File lib/sinapse/server.rb, line 15 def keep_alive @keep_alive ||= KeepAlive.new end
on_close(env)
click to toggle source
# File lib/sinapse/server.rb, line 19 def on_close(env) close_redis(env['redis']) if env['redis'] keep_alive.delete(env) end
response(env)
click to toggle source
# File lib/sinapse/server.rb, line 24 def response(env) env['redis'] = Redis.new(:driver => :synchrony, :url => Sinapse.config[:url]) user, channels = authenticate(env) return [401, {}, []] if user.nil? || channels.empty? EM.next_tick do sse(env, :ok, :authentication, retry: Config.retry) subscribe(env, user, channels) keep_alive << env end chunked_streaming_response(200, response_headers(env)) end
Private Instance Methods
authenticate(env)
click to toggle source
# File lib/sinapse/server.rb, line 41 def authenticate(env) user = env['redis'].get("sinapse:tokens:#{params['access_token']}") if user channels = env['redis'].smembers("sinapse:channels:#{user}") [user, channels] end end
close_redis(redis)
click to toggle source
# File lib/sinapse/server.rb, line 93 def close_redis(redis) if redis.subscribed? redis.unsubscribe else redis.quit end end
response_headers(env)
click to toggle source
# File lib/sinapse/server.rb, line 101 def response_headers(env) headers = { 'Connection' => 'close', 'Content-Type' => 'text/event-stream' } if env['cors.headers'] headers.merge(env['cors.headers']) else headers end end
sse(env, data, event = nil, options = {})
click to toggle source
# File lib/sinapse/server.rb, line 84 def sse(env, data, event = nil, options = {}) message = [] message << "retry: %d" % options[:retry] if options[:retry] message << "id: %d" % options[:id] if options[:id] message << "event: %s" % event if event message << "data: %s" % data.to_s.gsub(/\n/, "\ndata: ") env.chunked_stream_send message.join("\n") + "\n\n" end
subscribe(env, user, channels)
click to toggle source
# File lib/sinapse/server.rb, line 49 def subscribe(env, user, channels) EM.synchrony do env['redis'].psubscribe("sinapse:channels:#{user}:*") do |on| on.psubscribe do env['redis'].subscribe(*channels) end on.pmessage do |_, channel, message| update_subscriptions(env, message, channel) end on.message do |channel, data| event, message = unpack(channel, data) sse(env, message, event) end end env['redis'].quit end end
unpack(channel, data)
click to toggle source
# File lib/sinapse/server.rb, line 74 def unpack(channel, data) message = MessagePack.unpack(data) if message.is_a?(Array) message else event = Config.channel_event ? channel : nil [event, message] end end
update_subscriptions(env, message, channel)
click to toggle source
# File lib/sinapse/server.rb, line 69 def update_subscriptions(env, message, channel) return env['redis'].subscribe(message) if channel.end_with?(':add') return env['redis'].unsubscribe(message) if channel.end_with?(':remove') end