class WebsocketRails::Synchronization

Public Class Methods

all_users() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 7
def self.all_users
  singleton.all_users
end
destroy_user(connection) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 19
def self.destroy_user(connection)
  singleton.destroy_user connection
end
find_user(connection) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 11
def self.find_user(connection)
  singleton.find_user connection
end
publish(event) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 23
def self.publish(event)
  singleton.publish event
end
redis() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 35
def self.redis
  singleton.redis
end
register_user(connection) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 15
def self.register_user(connection)
  singleton.register_user connection
end
shutdown!() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 31
def self.shutdown!
  singleton.shutdown!
end
singleton() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 39
def self.singleton
  @singleton ||= new
end
synchronize!() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 27
def self.synchronize!
  singleton.synchronize!
end

Public Instance Methods

all_users() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 170
def all_users
  Fiber.new do
    redis.hgetall('websocket_rails.users')
  end.resume
end
destroy_user(identifier) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 157
def destroy_user(identifier)
  Fiber.new do
    redis.hdel 'websocket_rails.users', identifier
  end.resume
end
find_user(identifier) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 163
def find_user(identifier)
  Fiber.new do
    raw_user = redis.hget('websocket_rails.users', identifier)
    raw_user ? JSON.parse(raw_user) : nil
  end.resume
end
generate_server_token() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 128
def generate_server_token
  begin
    token = SecureRandom.urlsafe_base64
  end while redis.sismember("websocket_rails.active_servers", token)

  token
end
publish(event) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 59
def publish(event)
  Fiber.new do
    event.server_token = server_token
    redis.publish "websocket_rails.events", event.serialize
  end.resume
end
redis() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 45
def redis
  @redis ||= begin
    redis_options = WebsocketRails.config.redis_options
    EM.reactor_running? ? Redis.new(redis_options) : ruby_redis
  end
end
register_server(token) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 136
def register_server(token)
  Fiber.new do
    redis.sadd "websocket_rails.active_servers", token
    info "Server Registered: #{token}"
  end.resume
end
register_user(connection) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 149
def register_user(connection)
  Fiber.new do
    id = connection.user_identifier
    user = connection.user
    redis.hset 'websocket_rails.users', id, user.as_json(root: false).to_json
  end.resume
end
remove_server(token) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 143
def remove_server(token)
  ruby_redis.srem "websocket_rails.active_servers", token
  info "Server Removed: #{token}"
  EM.stop
end
ruby_redis() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 52
def ruby_redis
  @ruby_redis ||= begin
    redis_options = WebsocketRails.config.redis_options.merge(:driver => :ruby)
    Redis.new(redis_options)
  end
end
server_token() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 66
def server_token
  @server_token
end
shutdown!() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 124
def shutdown!
  remove_server(server_token)
end
synchronize!() click to toggle source
# File lib/websocket_rails/synchronization.rb, line 70
def synchronize!
  unless @synchronizing
    @server_token = generate_server_token
    register_server(@server_token)

    synchro = Fiber.new do
      fiber_redis = Redis.connect(WebsocketRails.config.redis_options)
      fiber_redis.subscribe "websocket_rails.events" do |on|

        on.message do |_, encoded_event|
          event = Event.new_from_json(encoded_event, nil)

          # Do nothing if this is the server that sent this event.
          next if event.server_token == server_token

          # Ensure an event never gets triggered twice. Events added to the
          # redis queue from other processes may not have a server token
          # attached.
          event.server_token = server_token if event.server_token.nil?

          trigger_incoming event
        end
      end

      info "Beginning Synchronization"
    end

    @synchronizing = true

    EM.next_tick { synchro.resume }

    trap('TERM') do
      Thread.new { shutdown! }
    end
    trap('INT') do
      Thread.new { shutdown! }
    end
    trap('QUIT') do
      Thread.new { shutdown! }
    end
  end
end
trigger_incoming(event) click to toggle source
# File lib/websocket_rails/synchronization.rb, line 113
def trigger_incoming(event)
  case
  when event.is_channel?
    WebsocketRails[event.channel].trigger_event(event)
  when event.is_user?
    connection = WebsocketRails.users[event.user_id.to_s]
    return if connection.nil?
    connection.trigger event
  end
end