class CeleryClient::TaskManager
Public Class Methods
new(connection, wait=true, timeout=60, verbose=false)
click to toggle source
# File lib/celery_client/task_manager.rb, line 7 def initialize(connection, wait=true, timeout=60, verbose=false) @connection = connection @timeout = Integer(timeout) @tasks = {} @wait = wait if @wait.is_a?(String) @wait = to_boolean(@wait) end @verbose = verbose if @verbose.is_a?(String) @verbose = to_boolean(@verbose) end end
Public Instance Methods
result(task_id)
click to toggle source
# File lib/celery_client/task_manager.rb, line 37 def result(task_id) check_task(task_id) check_for_result(task_id) end
run(context, task_name, params)
click to toggle source
# File lib/celery_client/task_manager.rb, line 21 def run(context, task_name, params) if @verbose puts "Executing #{context}/#{task_name} with payload #{params}" end result = @connection.post( get_run_path(context, task_name), params) task_id = get_task_id(result) save_task(task_id, context, task_name, params) task_id end
status(task_id)
click to toggle source
# File lib/celery_client/task_manager.rb, line 32 def status(task_id) check_task(task_id) check_for_status(task_id) end
Private Instance Methods
check_for_result(task_id)
click to toggle source
# File lib/celery_client/task_manager.rb, line 138 def check_for_result(task_id) begin return @tasks.fetch(task_id).fetch(:data).fetch(:result) rescue KeyError abort("No result for task: #{task_id}") end end
check_for_status(task_id)
click to toggle source
# File lib/celery_client/task_manager.rb, line 130 def check_for_status(task_id) begin return check_for_task(task_id).fetch(:data).fetch(:status) rescue KeyError abort("No status for task: #{task_id}") end end
check_for_task(task_id)
click to toggle source
# File lib/celery_client/task_manager.rb, line 122 def check_for_task(task_id) begin return @tasks.fetch(task_id) rescue KeyError abort("No task: #{task_id}") end end
check_task(task_id)
click to toggle source
# File lib/celery_client/task_manager.rb, line 48 def check_task(task_id) counter = 0 while true response = @connection.get( get_status_path(get_context(task_id), task_id)) status = get_task_status(response) set_status(task_id, status) set_result(task_id, get_task_result(response)) if not @wait or counter >= @timeout or status != 'PENDING' break end if @verbose puts "Current status is #{status}" end sleep 1 counter += 1 end end
get_context(task_id)
click to toggle source
# File lib/celery_client/task_manager.rb, line 101 def get_context(task_id) begin context = check_for_task(task_id).fetch(:context) rescue KeyError abort("No context for task: #{task_id}") end context end
get_run_path(context, task_name)
click to toggle source
# File lib/celery_client/task_manager.rb, line 67 def get_run_path(context, task_name) "/#{context}/manage/apply/#{task_name}" end
get_status_path(context, task_id)
click to toggle source
# File lib/celery_client/task_manager.rb, line 71 def get_status_path(context, task_id) "/#{context}/manage/status/#{task_id}" end
get_task_id(result)
click to toggle source
# File lib/celery_client/task_manager.rb, line 75 def get_task_id(result) get_task_key(result, 'task_id') end
get_task_key(result, key)
click to toggle source
# File lib/celery_client/task_manager.rb, line 87 def get_task_key(result, key) results = JSON.parse(result) begin value = results.fetch('task').fetch(key) rescue KeyError begin value = results.fetch(key) rescue KeyError abort("Unable to get #{key} from task. Got this instead: #{result}.") end end value end
get_task_result(result)
click to toggle source
# File lib/celery_client/task_manager.rb, line 79 def get_task_result(result) get_task_key(result, 'result') end
get_task_status(result)
click to toggle source
# File lib/celery_client/task_manager.rb, line 83 def get_task_status(result) get_task_key(result, 'status') end
save_task(task_id, context, task_name, params)
click to toggle source
# File lib/celery_client/task_manager.rb, line 118 def save_task(task_id, context, task_name, params) @tasks[task_id] = {:context => context, :task_name => task_name, :params => params, :data => {}} end
set_result(task_id, result)
click to toggle source
# File lib/celery_client/task_manager.rb, line 114 def set_result(task_id, result) @tasks[task_id][:data][:result] = result end
set_status(task_id, status)
click to toggle source
# File lib/celery_client/task_manager.rb, line 110 def set_status(task_id, status) @tasks[task_id][:data][:status] = status end
to_boolean(string)
click to toggle source
# File lib/celery_client/task_manager.rb, line 44 def to_boolean(string) !!(string =~ /^(true|t|yes|y|1)$/i) end