class Broadcaster
Attributes
id[R]
logger[R]
logger_name[R]
mutex[R]
publisher[R]
redis_client[R]
redis_settings[R]
subscriber[R]
subscriptions[R]
Public Class Methods
new(options={})
click to toggle source
# File lib/broadcaster.rb, line 17 def initialize(options={}) @id = options.fetch(:id, SecureRandom.uuid) @logger = options.fetch(:logger, Broadcaster.logger) @logger_name = "Broadcaster (#{@id})" @redis_client = options.fetch(:redis_client, Broadcaster.redis_client) @redis_settings = options.fetch(:redis_settings, Broadcaster.redis_settings) @publisher = establish_connection @subscriptions = {} @mutex = Mutex.new listen end
Public Instance Methods
publish(channel, message)
click to toggle source
# File lib/broadcaster.rb, line 29 def publish(channel, message) publisher.call! 'PUBLISH', scoped(channel), Marshal.dump(message) logger.debug(logger_name) { "Published | #{scoped(channel)} | #{message}" } end
subscribe(channel, callable=nil, &block)
click to toggle source
# File lib/broadcaster.rb, line 34 def subscribe(channel, callable=nil, &block) mutex.synchronize do SecureRandom.uuid.tap do |subscription_id| channel_subscriptions = subscriptions[scoped(channel)] ||= {} channel_subscriptions[subscription_id] = callable || block logger.debug(logger_name) { "Subscribed | #{scoped(channel)} | #{subscription_id}" } end end end
unsubscribe(subscription_id)
click to toggle source
# File lib/broadcaster.rb, line 44 def unsubscribe(subscription_id) mutex.synchronize do channel, _ = subscriptions.detect { |k,v| v.key? subscription_id } if channel block = subscriptions[channel].delete subscription_id subscriptions.delete_if { |k,v| v.empty? } logger.debug(logger_name) { "Unsubscribed | #{channel} | #{subscription_id}" } block end end end
unsubscribe_all()
click to toggle source
# File lib/broadcaster.rb, line 56 def unsubscribe_all mutex.synchronize do logger.debug(logger_name) { 'Unsubscribed all' } subscriptions.clear end end
Private Instance Methods
establish_connection()
click to toggle source
# File lib/broadcaster.rb, line 67 def establish_connection redis_client.new(redis_settings).tap { |redis| redis.call! 'PING' } end
listen()
click to toggle source
# File lib/broadcaster.rb, line 75 def listen subscriber = establish_connection subscriber.call! 'PSUBSCRIBE', scoped('*') logger.info(logger_name) { 'Listener started' } Thread.new do loop do begin notification = subscriber.client.read channel = notification[2] message = Marshal.load notification[3] current_subscriptions = mutex.synchronize do subscriptions.key?(channel) ? subscriptions[channel].dup : nil end if current_subscriptions logger.debug(logger_name) { "Broadcasting (#{current_subscriptions.count}) | #{channel} | #{message}" } current_subscriptions.each do |subscription_id, block| begin block.call message rescue => ex logger.error(logger_name) { "Failed | #{channel} | #{subscription_id} | #{message}\n#{ex.class}: #{ex.message}\n#{ex.backtrace.join("\n")}" } end end end rescue => ex logger.error(logger_name) { ex } break end end logger.warn(logger_name) { 'Listener broken' } listen end rescue => ex logger.error(logger_name) { ex } sleep Broadcaster.reconnection_timeout logger.info(logger_name) { 'Listener reconnectig' } retry end
scoped(channel)
click to toggle source
# File lib/broadcaster.rb, line 71 def scoped(channel) "#{id}:#{channel}" end