class Qless::Subscriber

A class used for subscribing to messages in a thread

Attributes

channel[R]
redis[R]

Public Class Methods

new(client, channel, options = {}, &message_received_callback) click to toggle source
# File lib/qless/subscriber.rb, line 14
def initialize(client, channel, options = {}, &message_received_callback)
  @channel = channel
  @message_received_callback = message_received_callback
  @log = options.fetch(:log) { ::Logger.new($stderr) }

  # pub/sub blocks the connection so we must use a different redis
  # connection
  @client_redis   = client.redis
  @listener_redis = client.new_redis_connection

  @my_channel = Qless.generate_jid
end
start(*args, &block) click to toggle source
# File lib/qless/subscriber.rb, line 8
def self.start(*args, &block)
  new(*args, &block).tap(&:start)
end

Public Instance Methods

start() click to toggle source

Start a thread listening

# File lib/qless/subscriber.rb, line 28
def start
  queue = ::Queue.new

  @thread = Thread.start do
    @listener_redis.subscribe(@channel, @my_channel) do |on|
      on.subscribe do |channel|
        queue.push(:subscribed) if channel == @channel
      end

      on.message do |channel, message|
        handle_message(channel, message)
      end
    end
  end

  queue.pop
end
stop() click to toggle source
# File lib/qless/subscriber.rb, line 46
def stop
  @client_redis.publish(@my_channel, 'disconnect')
  @thread.join
end

Private Instance Methods

handle_message(channel, message) click to toggle source
# File lib/qless/subscriber.rb, line 53
def handle_message(channel, message)
  if channel == @my_channel
    @listener_redis.unsubscribe(@channel, @my_channel) if message == "disconnect"
  else
    @message_received_callback.call(self, JSON.parse(message))
  end
rescue Exception => error
  @log.error("Qless::Subscriber") { error }
end