class Firehose::Server::Publisher
Constants
Private Class Methods
from_payload(payload)
click to toggle source
Deserialize components of a message back into Ruby.
# File lib/firehose/server/publisher.rb, line 64 def self.from_payload(payload) @payload_size ||= method(:to_payload).arity payload.split(PAYLOAD_DELIMITER, @payload_size) end
to_payload(channel_key, sequence, message)
click to toggle source
Serialize components of a message into something that can be dropped into Redis
.
# File lib/firehose/server/publisher.rb, line 59 def self.to_payload(channel_key, sequence, message) [channel_key, sequence, message].join(PAYLOAD_DELIMITER) end
Public Instance Methods
publish(channel_key, message, opts={})
click to toggle source
Publish a message to a Firehose
channel via Redis
.
# File lib/firehose/server/publisher.rb, line 12 def publish(channel_key, message, opts={}) # How long should we hang on to the resource once is published? ttl = (opts[:ttl] || TTL).to_i buffer_size = (opts[:buffer_size] || MessageBuffer::DEFAULT_SIZE).to_i # TODO hi-redis isn't that awesome... we have to setup an errback per even for wrong # commands because of the lack of a method_missing whitelist. Perhaps implement a whitelist in # em-hiredis or us a diff lib? if (deferrable = opts[:deferrable]).nil? deferrable = EM::DefaultDeferrable.new deferrable.errback do |e| # Handle missing Lua publishing script in cache # (such as Redis restarting or someone executing SCRIPT FLUSH) if e.message =~ /NOSCRIPT/ deferrable.succeed EM.next_tick do @publish_script_digest = nil combined_opts = opts.merge :deferrable => deferrable self.publish channel_key, message, combined_opts end else EM.next_tick { raise e } end end end if @publish_script_digest.nil? register_publish_script.errback do |e| deferrable.fail e end.callback do |digest| @publish_script_digest = digest Firehose.logger.debug "Registered Lua publishing script with Redis => #{digest}" eval_publish_script channel_key, message, ttl, buffer_size, deferrable end else eval_publish_script channel_key, message, ttl, buffer_size, deferrable end deferrable end
Private Instance Methods
eval_publish_script(channel_key, message, ttl, buffer_size, deferrable)
click to toggle source
# File lib/firehose/server/publisher.rb, line 80 def eval_publish_script(channel_key, message, ttl, buffer_size, deferrable) list_key = Server::Redis.key(channel_key, :list) script_args = [ Server::Redis.key(channel_key, :sequence), list_key, Server::Redis.key(:channel_updates), ttl, message, buffer_size, PAYLOAD_DELIMITER, channel_key ] redis.evalsha( @publish_script_digest, script_args.length, *script_args ).errback do |e| deferrable.fail e end.callback do |sequence| Firehose.logger.debug "Redis stored/published `#{message}` to list `#{list_key}` with sequence `#{sequence}`" deferrable.succeed end end
lua_escape(str)
click to toggle source
TODO: Make this FAR more robust. Ideally we’d whitelist the permitted
characters and then escape or remove everything else. See: http://en.wikibooks.org/wiki/Lua_Programming/How_to_Lua/escape_sequence
# File lib/firehose/server/publisher.rb, line 72 def lua_escape(str) str.gsub(/\\/,'\\\\\\').gsub(/"/,'\"').gsub(/\n/,'\n').gsub(/\r/,'\r') end
redis()
click to toggle source
# File lib/firehose/server/publisher.rb, line 54 def redis @redis ||= Firehose::Server.redis.connection end
register_publish_script()
click to toggle source
# File lib/firehose/server/publisher.rb, line 76 def register_publish_script redis.script 'LOAD', REDIS_PUBLISH_SCRIPT end