class SimpleWorker::TaskQueue

TaskQueue.new(redis, hostname, jobid, opts)

where hostname is the machines hostname or a unique identifier and 'opts' is a Hash of options:

:namespace    => String prefix to keys in redis used by SimpleWorker (default: simpleworker)
:task_timeout => Fixnum time after which a task expires, this should be > timout set in Runner (default: 10 seconds)

Constants

DEFAULT_OPTIONS

Public Class Methods

new(redis, hostname, jobid, opts = {}) click to toggle source
# File lib/simpleworker/task_queue.rb, line 18
def initialize(redis, hostname, jobid, opts = {})
  opts               = DEFAULT_OPTIONS.dup.merge(opts)
  @redis             = redis
  @namespace         = opts[:namespace]
  @task_timeout      = opts[:task_timeout]
  @jobid             = jobid
  @hostname          = hostname
  @active_key_prefix = "#{active_tasks_key}:#{@hostname}"
  @task_timeout      = JSON.parse(@redis.get(config_key))['task_timeout']
  load_lua_scripts
end

Public Instance Methods

each_task() { |local_task| ... } click to toggle source
# File lib/simpleworker/task_queue.rb, line 55
def each_task
  until pop.nil?
    local_task = @current_task
    fire_task_start local_task
    yield local_task
    fire_task_stop local_task
  end
end
expire_current_task() click to toggle source
# File lib/simpleworker/task_queue.rb, line 46
def expire_current_task
  @redis.del "#{@active_key_prefix}:#{@current_task}" if @current_task
  @current_task = nil
end
fire_log_message(msg) click to toggle source
# File lib/simpleworker/task_queue.rb, line 51
def fire_log_message(msg)
  push_to_log('on_log', @hostname, msg)
end
fire_start() click to toggle source
# File lib/simpleworker/task_queue.rb, line 30
def fire_start
  push_to_log('on_node_start', @hostname)
end
fire_stop() click to toggle source
# File lib/simpleworker/task_queue.rb, line 34
def fire_stop
  push_to_log('on_node_stop', @hostname)
end
fire_task_start(task) click to toggle source
# File lib/simpleworker/task_queue.rb, line 38
def fire_task_start(task)
  push_to_log('on_task_start', @hostname, task)
end
fire_task_stop(task) click to toggle source
# File lib/simpleworker/task_queue.rb, line 42
def fire_task_stop(task)
  push_to_log('on_task_stop', @hostname, task)
end
pop() click to toggle source
# File lib/simpleworker/task_queue.rb, line 64
def pop
  @redis.srem(active_tasks_key, "#{@active_key_prefix}:#{@current_task}") if @current_task
  @current_task = @redis.evalsha(@reliable_queue_sha,
                                 :keys => [tasks_key, active_tasks_key],
                                 :argv => [namespace, jobid, @hostname, @task_timeout])
  @current_task
end

Private Instance Methods

push_to_log(*args) click to toggle source
# File lib/simpleworker/task_queue.rb, line 74
def push_to_log(*args)
  @redis.rpush(log_key, args.to_json)
end