class Firehose::Server::Publisher

Constants

PAYLOAD_DELIMITER

Delimited used to frame different parts of a message that’s published over Firehose.

REDIS_PUBLISH_SCRIPT
TTL

Seconds that the message buffer should live before Redis expires it.

Private Class Methods

from_payload(payload) click to toggle source

Deserialize components of a message back into Ruby.

# File lib/firehose/server/publisher.rb, line 64
def self.from_payload(payload)
  @payload_size ||= method(:to_payload).arity
  payload.split(PAYLOAD_DELIMITER, @payload_size)
end
to_payload(channel_key, sequence, message) click to toggle source

Serialize components of a message into something that can be dropped into Redis.

# File lib/firehose/server/publisher.rb, line 59
def self.to_payload(channel_key, sequence, message)
  [channel_key, sequence, message].join(PAYLOAD_DELIMITER)
end

Public Instance Methods

publish(channel_key, message, opts={}) click to toggle source

Publish a message to a Firehose channel via Redis.

# File lib/firehose/server/publisher.rb, line 12
def publish(channel_key, message, opts={})
  # How long should we hang on to the resource once is published?
  ttl = (opts[:ttl] || TTL).to_i
  buffer_size = (opts[:buffer_size] || MessageBuffer::DEFAULT_SIZE).to_i

  # TODO hi-redis isn't that awesome... we have to setup an errback per even for wrong
  # commands because of the lack of a method_missing whitelist. Perhaps implement a whitelist in
  # em-hiredis or us a diff lib?
  if (deferrable = opts[:deferrable]).nil?
    deferrable = EM::DefaultDeferrable.new
    deferrable.errback do |e|
      # Handle missing Lua publishing script in cache
      # (such as Redis restarting or someone executing SCRIPT FLUSH)
      if e.message =~ /NOSCRIPT/
        deferrable.succeed
        EM.next_tick do
          @publish_script_digest = nil
          combined_opts = opts.merge :deferrable => deferrable
          self.publish channel_key, message, combined_opts
        end
      else
        EM.next_tick { raise e }
      end
    end
  end

  if @publish_script_digest.nil?
    register_publish_script.errback do |e|
      deferrable.fail e
    end.callback do |digest|
      @publish_script_digest = digest
      Firehose.logger.debug "Registered Lua publishing script with Redis => #{digest}"
      eval_publish_script channel_key, message, ttl, buffer_size, deferrable
    end
  else
    eval_publish_script channel_key, message, ttl, buffer_size, deferrable
  end

  deferrable
end

Private Instance Methods

eval_publish_script(channel_key, message, ttl, buffer_size, deferrable) click to toggle source
# File lib/firehose/server/publisher.rb, line 80
def eval_publish_script(channel_key, message, ttl, buffer_size, deferrable)
  list_key = Server::Redis.key(channel_key, :list)
  script_args = [
    Server::Redis.key(channel_key, :sequence),
    list_key,
    Server::Redis.key(:channel_updates),
    ttl,
    message,
    buffer_size,
    PAYLOAD_DELIMITER,
    channel_key
  ]

  redis.evalsha(
    @publish_script_digest, script_args.length, *script_args
  ).errback do |e|
    deferrable.fail e
  end.callback do |sequence|
    Firehose.logger.debug "Redis stored/published `#{message}` to list `#{list_key}` with sequence `#{sequence}`"
    deferrable.succeed
  end
end
lua_escape(str) click to toggle source

TODO: Make this FAR more robust. Ideally we’d whitelist the permitted

characters and then escape or remove everything else.
See: http://en.wikibooks.org/wiki/Lua_Programming/How_to_Lua/escape_sequence
# File lib/firehose/server/publisher.rb, line 72
def lua_escape(str)
  str.gsub(/\\/,'\\\\\\').gsub(/"/,'\"').gsub(/\n/,'\n').gsub(/\r/,'\r')
end
redis() click to toggle source
# File lib/firehose/server/publisher.rb, line 54
def redis
  @redis ||= Firehose::Server.redis.connection
end
register_publish_script() click to toggle source
# File lib/firehose/server/publisher.rb, line 76
def register_publish_script
  redis.script 'LOAD', REDIS_PUBLISH_SCRIPT
end