class ClusteredRpc::Transport::RedisCluster
Public Class Methods
new()
click to toggle source
# File lib/clustered_rpc/transport/redis_cluster.rb, line 7 def initialize @redis_subscriber = nil @redis_publish = nil @redis_message_pubsub_key = "__#{ClusteredRpc.cluster_namespace}_messages" connect end
Public Instance Methods
connect()
click to toggle source
# File lib/clustered_rpc/transport/redis_cluster.rb, line 45 def connect return if !@redis_subscriber.nil? # already connected - call reconnect... @redis_subscriber = ::Redis.new(ClusteredRpc.options) @redis_publish = ::Redis.new(ClusteredRpc.options) @subscribed = false @subscriber_thread = Thread.new do begin @redis_subscriber.subscribe( @redis_message_pubsub_key ) do |on| on.subscribe do |channel, subscriptions| @retry_count = 0 ClusteredRpc.logger.info {"ClusteredRpc: Subscribed to ##{channel} (#{subscriptions} subscriptions)"} @subscribed = true end on.message do |channel, message| ClusteredRpc.logger.debug {"ClusteredRpc: Handling message ##{channel}: #{message}"} begin message = JSON.parse(message) rescue message if message.is_a? Hash result = run_method_from_message(message) ClusteredRpc.logger.debug {"ClusteredRpc: Got result: #{result}"} request_id = message['request_id'] if request_id # Store the result for 10 minutes in hash identified by the request_id and for this particular PID ClusteredRpc.logger.debug {"Setting ClusterSend result: request:#{request_id}[#{Process.pid}]"} @redis_publish.pipelined do @redis_publish.hmset "request:#{request_id}", ClusteredRpc.instance_id, result.to_json @redis_publish.expire "request:#{request_id}", 600 end end else ClusteredRpc.logger.warn "Unknown message type: #{message.class}" end rescue => e ClusteredRpc.logger.error e.backtrace.join("\n") ClusteredRpc.logger.error "Error[#{e.message}] Handling message ##{channel}: #{message}" end end on.unsubscribe do |channel, subscriptions| ClusteredRpc.logger.info {"ClusteredRpc: Unsubscribed from ##{channel} (#{subscriptions} subscriptions)"} end end rescue Redis::BaseConnectionError => e ClusteredRpc.logger.error e.message @retry_count ||= 0 @retry_count += 1 sleep_seconds = [[@retry_count,10].min, 5].max ClusteredRpc.logger.warn "ClusteredRpc: Retrying redis connection in #{sleep_seconds} seconds: #{@retry_count}" ClusteredRpc.logger.info @config sleep sleep_seconds retry if @retry_count <= 300 ClusteredRpc.logger.warn "ClusteredRpc: Could not reconnect to Redis" ensure ClusteredRpc.logger.info "ClusteredRpc: Subscription thread terminated..." @subscribed = false end # begin end # @subscriber_thread = Thread.new do # Give the subscriber a chance to connect in background thread before returning attempts = 0 while !subscribed? do sleep(1) attempts += 1 ClusteredRpc.logger.info "ClusteredRpc: Waiting for subscription...#{attempts} times" raise "ClusteredRpc: Could not subscribe after #{attempts} attempts" if attempts > 15 end end
get_result(request_id)
click to toggle source
# File lib/clustered_rpc/transport/redis_cluster.rb, line 21 def get_result(request_id) hgetall("request:#{request_id}") end
hgetall(key_name)
click to toggle source
# File lib/clustered_rpc/transport/redis_cluster.rb, line 37 def hgetall(key_name) @redis_publish.hgetall key_name end
publish(payload={})
click to toggle source
# File lib/clustered_rpc/transport/redis_cluster.rb, line 14 def publish(payload={}) @redis_publish.publish @redis_message_pubsub_key, payload.to_json rescue => e ClusteredRpc.logger.error "ClusteredRpc.publish encountered error: #{e.message}" raise e end
reconnect()
click to toggle source
# File lib/clustered_rpc/transport/redis_cluster.rb, line 25 def reconnect @redis_publish = nil if @subscriber_thread ClusteredRpc.logger.warn "ClusteredRpc: killing subscriber thread" @subscriber_thread.kill # https://stackoverflow.com/questions/49490278/wait-for-a-thread-to-die-in-ruby sleep 0.01 while @subscriber_thread.alive? @redis_subscriber = nil end connect end
subscribed?()
click to toggle source
# File lib/clustered_rpc/transport/redis_cluster.rb, line 41 def subscribed? @subscribed end