class CelluloidPubsub::RedisReactor

reactor used for redis pubsub @!attribute connected

@return [Boolean] returns true if already connected to redis

@!attribute connection

@return [EM::Hiredis] The connection used for redis

Attributes

connected[RW]
connected?[RW]
connection[RW]

Public Instance Methods

add_subscriber_to_channel(channel, message) click to toggle source

method used to subscribe to a channel @see redis_action

@return [void]

@api public

Calls superclass method
# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 42
def add_subscriber_to_channel(channel, message)
  super
  async.redis_action('subscribe', channel, message)
end
server_pusblish_event(topic, data) click to toggle source

method used to publish event using redis

@return [void]

@api public

# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 87
def server_pusblish_event(topic, data)
  return if topic.blank? || data.blank?
  connect_to_redis do |connection|
    connection.publish(topic, data)
  end
rescue => exception
  log_debug("could not publish message #{message} into topic #{current_topic} because of #{exception.inspect}")
end
shutdown() click to toggle source

method used to shutdown the reactor and unsubscribe from all channels @see redis_action

@return [void]

@api public

Calls superclass method
# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 75
def shutdown
  @channels.dup.each do |channel|
    redis_action('unsubscribe', channel) unless ENV['RACK_ENV'] == 'test'
  end if @channels.present?
  super
end
unsubscribe(channel, data) click to toggle source

method used to unsubscribe from a channel @see redis_action

@return [void]

@api public

Calls superclass method
# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 31
def unsubscribe(channel, data)
  super
  async.redis_action('unsubscribe', channel)
end
unsubscribe_all(channel, data) click to toggle source

method used to unsubscribe from all channels @see redis_action

@return [void]

@api public

# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 64
def unsubscribe_all(channel, data)
  info 'clearing connections'
  shutdown
end
unsubscribe_from_channel(channel) click to toggle source

method used to unsubscribe from a channel @see redis_action

@return [void]

@api public

Calls superclass method
# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 53
def unsubscribe_from_channel(channel)
  super
  async.redis_action('unsubscribe', channel)
end

Private Instance Methods

action_success(action, channel, message) click to toggle source

method used to fetch the pubsub client from the connection and yield it @see action_subscribe

@param [string] action The action that will be checked @param [string] channel The channel that reactor has subscribed to @param [string] message The initial message used to subscribe

@return [void]

@api private

# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 162
def action_success(action, channel, message)
  action_subscribe?(action) ? message.merge('client_action' => 'successful_subscription', 'channel' => channel) : nil
end
connect_to_redis(&block) click to toggle source

method used to run the enventmachine and setup the exception handler @see run_the_eventmachine @see setup_em_exception_handler

@param [Proc] block the block that will use the connection

@return [void]

@api private

# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 107
def connect_to_redis(&block)
  require 'eventmachine'
  require 'em-hiredis'
  run_the_eventmachine(&block)
  setup_em_exception_handler
end
debug_enabled?() click to toggle source

the method will return true if debug is enabled

@return [Boolean] returns true if debug is enabled otherwise false

@api public

# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 241
def debug_enabled?
  @server.debug_enabled?
end
fetch_pubsub() { |pubsub| ... } click to toggle source

method used to fetch the pubsub client from the connection and yield it

@return [void]

@api private

# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 145
def fetch_pubsub
  connect_to_redis do |connection|
    @pubsub ||= connection.pubsub
    yield @pubsub if block_given?
  end
end
log_unsubscriptions(pubsub) click to toggle source

method used to listen to unsubscriptions and log them to log file @see register_redis_callback @see register_redis_error_callback

@param [EM::Hiredis::PubsubClient] pubsub The pubsub client that will be used to listen to unsubscriptions

@return [void]

@api private

# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 213
def log_unsubscriptions(pubsub)
  pubsub.on(:unsubscribe) do |subscribed_channel, remaining_subscriptions|
    log_debug [:unsubscribe_happened, subscribed_channel, remaining_subscriptions].inspect
  end
end
prepare_redis_action(pubsub, action) click to toggle source

method used check if the action is subscribe and write the incoming message to be websocket or log the message otherwise @see log_unsubscriptions @see action_subscribe

@param [String] action The action that will be checked if it is subscribed

@return [void]

@api private

# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 197
def prepare_redis_action(pubsub, action)
  log_unsubscriptions(pubsub)
  proc do |subscribed_message|
    action_subscribe?(action) ? (@websocket << subscribed_message) : log_debug(message)
  end
end
redis_action(action, channel = nil, message = {}) click to toggle source

method used execute an action (subscribe or unsubscribe ) to redis @see prepare_redis_action @see action_success @see register_subscription_callbacks

@param [string] action The action that will be checked @param [string] channel The channel that reactor has subscribed to @param [string] message The initial message used to subscribe

@return [void]

@api private

# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 178
def redis_action(action, channel = nil, message = {})
  fetch_pubsub do |pubsub|
    callback = prepare_redis_action(pubsub, action)
    success_message = action_success(action, channel, message)
    args = action_subscribe?(action) ? [channel, callback] : [channel]
    subscription = pubsub.send(action, *args)
    register_subscription_callbacks(subscription, action, success_message)
  end
end
register_redis_callback(subscription, action, sucess_message = nil) click to toggle source

method used to register a success callback and if action is subscribe will write back to the websocket a message that will say it is a successful_subscription If action is something else, will log the incoming message @see log_debug

@param [EM::DefaultDeferrable] subscription The subscription object @param [string] sucess_message The initial message used to subscribe

@return [void]

@api private

# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 256
def register_redis_callback(subscription, action, sucess_message = nil)
  subscription.callback do |subscriptions_ids|
    if sucess_message.present?
      @websocket << sucess_message.merge('subscriptions' => subscriptions_ids).to_json
    else
      log_debug "#{action} success #{sucess_message.inspect}"
    end
  end
end
register_redis_error_callback(subscription, action) click to toggle source

Register an error callback on the deferrable object and logs to file the incoming message @see log_debug

@param [EM::DefaultDeferrable] subscription The subscription object @param [string] action The action that will be checked

@return [void]

@api private

# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 275
def register_redis_error_callback(subscription, action)
  subscription.errback { |reply| log_debug "#{action} error #{reply.inspect}" }
end
register_subscription_callbacks(subscription, action, sucess_message = nil) click to toggle source

method used registers the sucess and error callabacks @see register_redis_callback @see register_redis_error_callback

@param [EM::DefaultDeferrable] subscription The subscription object @param [string] action The action that will be checked @param [string] sucess_message The initial message used to subscribe

@return [void]

@api private

# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 230
def register_subscription_callbacks(subscription, action, sucess_message = nil)
  register_redis_callback(subscription, action, sucess_message)
  register_redis_error_callback(subscription, action)
end
run_the_eventmachine(&block) click to toggle source

method used to connect to redis and yield the connection

@param [Proc] block the block that will use the connection

@return [void]

@api private

# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 121
def run_the_eventmachine(&block)
  EM.run do
    @connection ||= EM::Hiredis.connect
    @connected = true
    block.call @connection
  end
end
setup_em_exception_handler() click to toggle source

method used to setup the eventmachine exception handler

@return [void]

@api private

# File lib/celluloid_pubsub_redis_adapter/redis_reactor.rb, line 134
def setup_em_exception_handler
  EM.error_handler do |error|
    debug error unless filtered_error?(error)
  end
end