class Broadcaster

Attributes

id[R]
logger[R]
logger_name[R]
mutex[R]
publisher[R]
redis_client[R]
redis_settings[R]
subscriber[R]
subscriptions[R]

Public Class Methods

new(options={}) click to toggle source
# File lib/broadcaster.rb, line 17
def initialize(options={})
  @id = options.fetch(:id, SecureRandom.uuid)
  @logger = options.fetch(:logger, Broadcaster.logger)
  @logger_name = "Broadcaster (#{@id})"
  @redis_client = options.fetch(:redis_client, Broadcaster.redis_client)
  @redis_settings = options.fetch(:redis_settings, Broadcaster.redis_settings)
  @publisher = establish_connection
  @subscriptions = {}
  @mutex = Mutex.new
  listen
end

Public Instance Methods

publish(channel, message) click to toggle source
# File lib/broadcaster.rb, line 29
def publish(channel, message)
  publisher.call! 'PUBLISH', scoped(channel), Marshal.dump(message)
  logger.debug(logger_name) { "Published | #{scoped(channel)} | #{message}" }
end
subscribe(channel, callable=nil, &block) click to toggle source
# File lib/broadcaster.rb, line 34
def subscribe(channel, callable=nil, &block)
  mutex.synchronize do
    SecureRandom.uuid.tap do |subscription_id|
      channel_subscriptions = subscriptions[scoped(channel)] ||= {}
      channel_subscriptions[subscription_id] = callable || block
      logger.debug(logger_name) { "Subscribed | #{scoped(channel)} | #{subscription_id}" }
    end
  end
end
unsubscribe(subscription_id) click to toggle source
# File lib/broadcaster.rb, line 44
def unsubscribe(subscription_id)
  mutex.synchronize do
    channel, _ = subscriptions.detect { |k,v| v.key? subscription_id }
    if channel
      block = subscriptions[channel].delete subscription_id
      subscriptions.delete_if { |k,v| v.empty? }
      logger.debug(logger_name) { "Unsubscribed | #{channel} | #{subscription_id}" }
      block
    end
  end
end
unsubscribe_all() click to toggle source
# File lib/broadcaster.rb, line 56
def unsubscribe_all
  mutex.synchronize do
    logger.debug(logger_name) { 'Unsubscribed all' }
    subscriptions.clear
  end
end

Private Instance Methods

establish_connection() click to toggle source
# File lib/broadcaster.rb, line 67
def establish_connection
  redis_client.new(redis_settings).tap { |redis| redis.call! 'PING' }
end
listen() click to toggle source
# File lib/broadcaster.rb, line 75
def listen
  subscriber = establish_connection
  subscriber.call! 'PSUBSCRIBE', scoped('*')

  logger.info(logger_name) { 'Listener started' }

  Thread.new do
    loop do
      begin
        notification = subscriber.client.read

        channel = notification[2]
        message = Marshal.load notification[3]

        current_subscriptions = mutex.synchronize do
          subscriptions.key?(channel) ? subscriptions[channel].dup : nil
        end

        if current_subscriptions
          logger.debug(logger_name) { "Broadcasting (#{current_subscriptions.count}) | #{channel} | #{message}" }

          current_subscriptions.each do |subscription_id, block|
            begin
              block.call message
            rescue => ex
              logger.error(logger_name) { "Failed | #{channel} | #{subscription_id} | #{message}\n#{ex.class}: #{ex.message}\n#{ex.backtrace.join("\n")}" }
            end
          end
        end

      rescue => ex
        logger.error(logger_name) { ex }
        break
      end
    end

    logger.warn(logger_name) { 'Listener broken' }
    listen
  end

rescue => ex
  logger.error(logger_name) { ex }
  sleep Broadcaster.reconnection_timeout

  logger.info(logger_name) { 'Listener reconnectig' }
  retry
end
scoped(channel) click to toggle source
# File lib/broadcaster.rb, line 71
def scoped(channel)
  "#{id}:#{channel}"
end