module Qs::Client::InstanceMethods

Attributes

redis[R]
redis_connect_hash[R]

Public Class Methods

new(redis_connect_hash) click to toggle source
# File lib/qs/client.rb, line 30
def initialize(redis_connect_hash)
  @redis_connect_hash = redis_connect_hash
end

Public Instance Methods

append(queue_redis_key, encoded_payload) click to toggle source
# File lib/qs/client.rb, line 59
def append(queue_redis_key, encoded_payload)
  self.redis.connection{ |c| c.lpush(queue_redis_key, encoded_payload) }
end
block_dequeue(*args) click to toggle source
# File lib/qs/client.rb, line 55
def block_dequeue(*args)
  self.redis.connection{ |c| c.brpop(*args) }
end
clear(redis_key) click to toggle source
# File lib/qs/client.rb, line 67
def clear(redis_key)
  self.redis.connection{ |c| c.del(redis_key) }
end
clear_subscriptions(queue) click to toggle source
# File lib/qs/client.rb, line 88
def clear_subscriptions(queue)
  pattern = Qs::Event::SubscribersRedisKey.new('*')
  event_subs_keys = self.redis.connection{ |c| c.keys(pattern) }

  redis_transaction do |c|
    event_subs_keys.each{ |key| c.srem(key, queue.name) }
  end
end
enqueue(queue, job_name, job_params = nil) click to toggle source
# File lib/qs/client.rb, line 34
def enqueue(queue, job_name, job_params = nil)
  job = Qs::Job.new(job_name, :params => job_params)
  enqueue!(queue, job)
  job
end
event_subscribers(event) click to toggle source
# File lib/qs/client.rb, line 97
def event_subscribers(event)
  self.redis.connection{ |c| c.smembers(event.subscribers_redis_key) }
end
ping() click to toggle source
# File lib/qs/client.rb, line 71
def ping
  self.redis.connection{ |c| c.ping }
end
prepend(queue_redis_key, encoded_payload) click to toggle source
# File lib/qs/client.rb, line 63
def prepend(queue_redis_key, encoded_payload)
  self.redis.connection{ |c| c.rpush(queue_redis_key, encoded_payload) }
end
publish(channel, name, params = nil) click to toggle source
# File lib/qs/client.rb, line 40
def publish(channel, name, params = nil)
  publish!(channel, name, :event_params => params)
end
publish_as(publisher, channel, name, params = nil) click to toggle source
# File lib/qs/client.rb, line 44
def publish_as(publisher, channel, name, params = nil)
  publish!(channel, name, {
    :event_params    => params,
    :event_publisher => publisher,
  })
end
push(queue_name, payload_hash) click to toggle source
# File lib/qs/client.rb, line 51
def push(queue_name, payload_hash)
  raise NotImplementedError
end
sync_subscriptions(queue) click to toggle source
# File lib/qs/client.rb, line 75
def sync_subscriptions(queue)
  pattern = Qs::Event::SubscribersRedisKey.new('*')
  all_event_subs_keys = self.redis.connection{ |c| c.keys(pattern) }

  event_subs_keys = queue.event_route_names.map do |route_name|
    Qs::Event::SubscribersRedisKey.new(route_name)
  end
  redis_transaction do |c|
    all_event_subs_keys.each{ |key| c.srem(key, queue.name) }
    event_subs_keys.each{ |key| c.sadd(key, queue.name) }
  end
end

Private Instance Methods

publish!(channel, name, options = nil) click to toggle source
# File lib/qs/client.rb, line 103
def publish!(channel, name, options = nil)
  dispatch_job = DispatchJob.new(channel, name, options)
  enqueue!(Qs.dispatcher_queue, dispatch_job)
  dispatch_job.event
end
redis_transaction() { |c| ... } click to toggle source
# File lib/qs/client.rb, line 109
def redis_transaction
  self.redis.connection{ |c| c.pipelined{ c.multi{ yield c } } }
end