module Insque

Constants

DEFAULT_INBOX_TTL
DEFAULT_PROCESSING_TTL
VERSION

Public Class Methods

broadcast(message, params = nil, recipient = :any) click to toggle source
# File lib/insque.rb, line 67
def self.broadcast message, params = nil, recipient = :any
  keys = []
  case recipient
  when :any
    pointers = @redis.keys('{insque}inbox_pointer_*')
    keys = pointers.count > 0 ? @redis.mget(*pointers) : []
  when :self
    keys = [@inbox]
  when :slow
    keys = [@slow_inbox]
  else
    keys = recipient.is_a?(Array) ? recipient : [recipient]
  end
  value = { message: "#{@sender}_#{message}", params: params, broadcasted_at: Time.now.utc }.to_json
  logger.debug event: :sending, message: value, to: keys.to_json
  @redis.multi do |r|
    keys.each {|k| r.lpush k, value}
  end
end
inbox_ttl() click to toggle source
# File lib/insque.rb, line 16
def self.inbox_ttl
  @inbox_ttl || DEFAULT_INBOX_TTL
end
inbox_ttl=(val) click to toggle source
# File lib/insque.rb, line 12
def self.inbox_ttl= val
  @inbox_ttl = val
end
janitor(redis=nil) click to toggle source
# File lib/insque.rb, line 96
def self.janitor redis=nil
  real_janitor @inbox, @processing, (redis || create_redis_connection), @inbox_pointer
end
listen(worker_name='', redis=nil) click to toggle source
# File lib/insque.rb, line 87
def self.listen worker_name='', redis=nil
  redis ||= create_redis_connection
  do_listen @inbox, @processing, redis, worker_name, @inbox_pointer
end
logger() click to toggle source
# File lib/insque.rb, line 40
def self.logger
  @logger ||= JsonLogger.new STDOUT, additional_fields: { tag: 'insque' }
end
logger=(l) click to toggle source
# File lib/insque.rb, line 36
def self.logger= l
  @logger = l
end
processing_ttl() click to toggle source
# File lib/insque.rb, line 24
def self.processing_ttl
  @processing_ttl || DEFAULT_PROCESSING_TTL
end
processing_ttl=(val) click to toggle source
# File lib/insque.rb, line 20
def self.processing_ttl= val
  @processing_ttl = val
end
redis() click to toggle source
# File lib/insque.rb, line 32
def self.redis
  @redis
end
redis=(redis) click to toggle source
# File lib/insque.rb, line 28
def self.redis= redis
  @redis = redis
end
redis_class=(klass) click to toggle source
# File lib/insque.rb, line 44
def self.redis_class= klass
  @redis_class = klass
end
redis_config() click to toggle source
# File lib/insque.rb, line 48
def self.redis_config
  @redis_config
end
redis_config=(redis) click to toggle source
# File lib/insque.rb, line 52
def self.redis_config= redis
  @redis_config = redis
  @redis = self.create_redis_connection
end
sender=(sender) click to toggle source
# File lib/insque.rb, line 57
def self.sender= sender
  @sender = sender
  @inbox = "{insque}inbox_#{sender}"
  @inbox_pointer = "{insque}inbox_pointer_#{sender}"
  @processing = "{insque}processing_#{sender}"
  @slow_inbox = "{insque}slow_inbox_#{sender}"
  @slow_processing = "{insque}slow_processing_#{sender}"
  create_send_later_handler
end
slow_janitor(redis=nil) click to toggle source
# File lib/insque.rb, line 100
def self.slow_janitor redis=nil
  real_janitor @slow_inbox, @slow_processing, (redis || create_redis_connection)
end
slow_listen(worker_name='', redis=nil) click to toggle source
# File lib/insque.rb, line 92
def self.slow_listen worker_name='', redis=nil
  do_listen @slow_inbox, @slow_processing, (redis || create_redis_connection), worker_name
end

Private Class Methods

create_redis_connection() click to toggle source
# File lib/insque.rb, line 159
def self.create_redis_connection
  (@redis_class || Redis).new @redis_config
end
create_send_later_handler() click to toggle source
# File lib/insque.rb, line 163
def self.create_send_later_handler
  define_singleton_method("#{@sender}_send_later") do |msg|
    Kernel.const_get(msg['params']['class']).unscoped.find(msg['params']['id']).send(msg['params']['method'], *msg['params']['args'])      
  end
end
do_listen(inbox, processing, redis, worker_name, pointer=nil) click to toggle source
# File lib/insque.rb, line 105
def self.do_listen inbox, processing, redis, worker_name, pointer=nil
  logger.info event: :starting, worker_name: worker_name, inbox: inbox
  loop do
    redis.setex(pointer, inbox_ttl, inbox) if pointer
    message = redis.brpoplpush(inbox, processing, 0)
    begin
      logger.debug event: :receiving, message: message, inbox: inbox
      parsed_message = JSON.parse message
      send(parsed_message['message'], parsed_message) 
    rescue NoMethodError
    rescue => e
      logger.error e
    ensure
      redis.lrem processing, 0, message
    end
  end
end
real_janitor(inbox, processing, redis, pointer=nil) click to toggle source
# File lib/insque.rb, line 123
def self.real_janitor inbox, processing, redis, pointer=nil
  loop do
    redis.setex(pointer, inbox_ttl, inbox) if pointer
    redis.watch processing
    errors = []
    restart = []
    delete = []
    redis.lrange(processing, 0, -1).each do |m|
      begin
        parsed_message = JSON.parse(m)
        if parsed_message['restarted_at'] && Time.now.to_i - Time.parse(parsed_message['restarted_at']).to_i > processing_ttl
          errors << m 
          delete << m
        elsif Time.now.to_i - Time.parse(parsed_message['broadcasted_at']).to_i > processing_ttl
          restart << parsed_message.merge(restarted_at: Time.now.utc).to_json
          delete << m
        end
      rescue => e
        logger.error e
      end
    end
    result = redis.multi do |r|
      restart.each {|m| r.lpush inbox, m }
      delete.each {|m| r.lrem processing, 0, m }
    end
    if result
      errors.each {|m| logger.debug event: :deleting, message: m }
      restart.each {|m| logger.debug event: :restarting, message: m }
      logger.info event: :cleaning, status: 'success', inbox: inbox
    else
      logger.info event: :cleaning, status: 'failed', inbox: inbox
    end
    sleep(Random.rand((inbox_ttl.to_f / 10).ceil) + 1)
  end
end