class WebsocketRails::Synchronization

Public Class Methods

all_users() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 8
def self.all_users
  singleton.all_users
end
destroy_user(connection) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 20
def self.destroy_user(connection)
  singleton.destroy_user connection
end
find_user(connection) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 12
def self.find_user(connection)
  singleton.find_user connection
end
publish(event) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 24
def self.publish(event)
  singleton.publish event
end
redis() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 36
def self.redis
  singleton.redis
end
register_user(connection) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 16
def self.register_user(connection)
  singleton.register_user connection
end
shutdown!() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 32
def self.shutdown!
  singleton.shutdown!
end
singleton() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 40
def self.singleton
  @singleton ||= new
end
synchronize!() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 28
def self.synchronize!
  singleton.synchronize!
end

Public Instance Methods

all_users() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 175
def all_users
  Fiber.new do
    redis.hgetall('websocket_rails.users')
  end.resume
end
destroy_user(identifier) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 162
def destroy_user(identifier)
  Fiber.new do
    redis.hdel 'websocket_rails.users', identifier
  end.resume
end
find_user(identifier) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 168
def find_user(identifier)
  Fiber.new do
    raw_user = redis.hget('websocket_rails.users', identifier)
    raw_user ? JSON.parse(raw_user) : nil
  end.resume
end
generate_server_token() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 133
def generate_server_token
  begin
    token = SecureRandom.urlsafe_base64
  end while redis.sismember("websocket_rails.active_servers", token)

  token
end
publish(event) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 63
def publish(event)
  Fiber.new do
    event.server_token = server_token
    redis.publish "websocket_rails.events", event.serialize
  end.resume
end
redis() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 46
def redis
  @redis ||= begin
    redis_options = WebsocketRails.config.redis_options
    debug "Reactor is not running - engaging ruby redis" unless EM.reactor_running?
    debug "Reactor is running - engaging standard redis new" if EM.reactor_running?
    EM.reactor_running? ? Redis.new(redis_options) : ruby_redis
  end
end
register_server(token) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 141
def register_server(token)
  Fiber.new do
    redis.sadd "websocket_rails.active_servers", token
    info "Server Registered: #{token}"
  end.resume
end
register_user(connection) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 154
def register_user(connection)
  Fiber.new do
    id = connection.user_identifier
    user = connection.user
    redis.hset 'websocket_rails.users', id, user.as_json(root: false).to_json
  end.resume
end
remove_server(token) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 148
def remove_server(token)
  ruby_redis.srem "websocket_rails.active_servers", token
  info "Server Removed: #{token}"
  EM.stop
end
ruby_redis() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 55
def ruby_redis
  @ruby_redis ||= begin
    WebsocketRails.config.redis_options.merge(:driver => :ruby) unless WebsocketRails.config.redis_options.has_key? :driver
    redis_options = WebsocketRails.config.redis_options
    Redis.new(redis_options)
  end
end
server_token() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 70
def server_token
  @server_token
end
shutdown!() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 129
def shutdown!
  remove_server(server_token)
end
synchronize!() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 74
def synchronize!
  unless @synchronizing
    @server_token = generate_server_token
    register_server(@server_token)

    synchro = Fiber.new do
      EM.defer do
        fiber_redis = Redis.new(WebsocketRails.config.redis_options)
        fiber_redis.subscribe "websocket_rails.events" do |on|
          debug "Subscribed to websocket_rails events"

          on.message do |_, encoded_event|
            event = Event.new_from_json(encoded_event, nil)

            # Do nothing if this is the server that sent this event.
            next if event.server_token == server_token

            # Ensure an event never gets triggered twice. Events added to the
            # redis queue from other processes may not have a server token
            # attached.
            event.server_token = server_token if event.server_token.nil?

            trigger_incoming event
          end
        end
      end
      info "Beginning Synchronization"
    end
    @synchronizing = true

    EM.next_tick { synchro.resume }

    trap('TERM') do
      Thread.new { shutdown! }
    end
    trap('INT') do
      Thread.new { shutdown! }
    end
    trap('QUIT') do
      Thread.new { shutdown! }
    end
  end
end
trigger_incoming(event) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 118
def trigger_incoming(event)
  case
  when event.is_channel?
    WebsocketRails[event.channel].trigger_event(event)
  when event.is_user?
    connection = WebsocketRails.users[event.user_id.to_s]
    return if connection.nil?
    connection.trigger event
  end
end