class BackgrounDRb::RailsWorkerProxy
A Worker proxy, which uses method_missing
for delegating method calls to the workers
Attributes
data[RW]
middle_man[RW]
worker_key[RW]
worker_method[RW]
worker_name[RW]
Public Class Methods
new(p_worker_name,p_worker_key = nil,p_middle_man = nil)
click to toggle source
create new worker proxy
# File lib/backgroundrb/rails_worker_proxy.rb, line 7 def initialize(p_worker_name,p_worker_key = nil,p_middle_man = nil) @worker_name = p_worker_name @middle_man = p_middle_man @worker_key = p_worker_key @tried_connections = [] end
Public Instance Methods
ask_result(job_key)
click to toggle source
get results back from the cache. Cache can be in-memory worker cache or memcache based cache
# File lib/backgroundrb/rails_worker_proxy.rb, line 114 def ask_result job_key options = compact(:worker => worker_name,:worker_key => worker_key,:job_key => job_key) if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache' return_result_from_memcache(options) else result = middle_man.backend_connections.map { |conn| conn.ask_result(options) } return_result(result) end end
choose_connection(host_info)
click to toggle source
choose a worker
# File lib/backgroundrb/rails_worker_proxy.rb, line 165 def choose_connection host_info case host_info when :all; middle_man.backend_connections when :local; middle_man.find_local when String; middle_man.find_connection(host_info) else; middle_man.choose_server end end
compact(options = { })
click to toggle source
helper method to compact a hash and for getting rid of nil parameters
# File lib/backgroundrb/rails_worker_proxy.rb, line 175 def compact(options = { }) options.delete_if { |key,value| value.nil? } options end
delete()
click to toggle source
delete a worker
# File lib/backgroundrb/rails_worker_proxy.rb, line 157 def delete middle_man.backend_connections.each do |connection| connection.delete_worker(compact(:worker => worker_name, :worker_key => worker_key)) end return worker_key end
dequeue_task(options = {})
click to toggle source
remove tasks from the worker pool
# File lib/backgroundrb/rails_worker_proxy.rb, line 55 def dequeue_task options = {} BdrbJobQueue.remove_job(options) end
enqueue_task(options = {})
click to toggle source
enqueue tasks to the worker pool
# File lib/backgroundrb/rails_worker_proxy.rb, line 50 def enqueue_task options = {} BdrbJobQueue.insert_job(options) end
gen_key(options)
click to toggle source
generate worker key
# File lib/backgroundrb/rails_worker_proxy.rb, line 132 def gen_key options key = [options[:worker],options[:worker_key],options[:job_key]].compact.join('_') key end
invoke_on_connection(connection,method_name,options = {})
click to toggle source
choose a backgroundrb server connection and invoke worker method on it.
# File lib/backgroundrb/rails_worker_proxy.rb, line 107 def invoke_on_connection connection,method_name,options = {} raise NoServerAvailable.new("No BackgrounDRb is found running") unless connection connection.send(method_name,options) end
method_missing(method_id,*args)
click to toggle source
# File lib/backgroundrb/rails_worker_proxy.rb, line 14 def method_missing(method_id,*args) worker_method = method_id.to_s arguments = args.first arg,job_key,host_info,scheduled_at,priority = arguments && arguments.values_at(:arg,:job_key,:host,:scheduled_at, :priority) # allow both arg and args arg ||= arguments && arguments[:args] new_schedule = (scheduled_at && scheduled_at.respond_to?(:utc)) ? scheduled_at.utc : Time.now.utc if worker_method =~ /^async_(\w+)/ method_name = $1 worker_options = compact(:worker => worker_name,:worker_key => worker_key, :worker_method => method_name,:job_key => job_key, :arg => arg) run_method(host_info,:ask_work,worker_options) elsif worker_method =~ /^enq_(\w+)/i raise NoJobKey.new("Must specify a job key with enqueued tasks") if job_key.blank? method_name = $1 marshalled_args = Marshal.dump(arg) enqueue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s, :worker_method => method_name.to_s,:job_key => job_key.to_s, :priority => priority, :args => marshalled_args,:timeout => arguments ? arguments[:timeout] : nil,:scheduled_at => new_schedule)) elsif worker_method =~ /^deq_(\w+)/i raise NoJobKey.new("Must specify a job key to dequeue tasks") if job_key.blank? method_name = $1 dequeue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s, :worker_method => method_name.to_s,:job_key => job_key.to_s)) else worker_options = compact(:worker => worker_name,:worker_key => worker_key, :worker_method => worker_method,:job_key => job_key,:arg => arg) run_method(host_info,:send_request,worker_options) end end
process_result(t_result)
click to toggle source
# File lib/backgroundrb/rails_worker_proxy.rb, line 87 def process_result t_result case t_result when Hash if(t_result[:result] == true && t_result[:type] = :response) if(t_result[:result_flag] == "ok") return t_result[:data] else raise RemoteWorkerError.new("Error while executing worker method") end elsif(t_result[:result_flag] == "ok") "ok" elsif(t_result[:result_flag] == "error") raise RemoteWorkerError.new("Error while executing worker method") end when Array t_result end end
reset_memcache_result(job_key,value)
click to toggle source
reset result within memcache for given key
# File lib/backgroundrb/rails_worker_proxy.rb, line 143 def reset_memcache_result(job_key,value) options = compact(:worker => worker_name,:worker_key => worker_key,\ :job_key => job_key) key = gen_key(options) middle_man.cache[key] = value value end
return_result(result)
click to toggle source
# File lib/backgroundrb/rails_worker_proxy.rb, line 151 def return_result result result = Array(result) result.size <= 1 ? result[0] : result end
return_result_from_memcache(options = {})
click to toggle source
return result from memcache
# File lib/backgroundrb/rails_worker_proxy.rb, line 138 def return_result_from_memcache options = {} middle_man.cache[gen_key(options)] end
run_method(host_info,method_name,worker_options = {})
click to toggle source
invoke method on worker
# File lib/backgroundrb/rails_worker_proxy.rb, line 60 def run_method host_info,method_name,worker_options = {} result = [] connection = choose_connection(host_info) raise NoServerAvailable.new("No BackgrounDRb server is found running") if connection.blank? if host_info == :local or host_info.is_a?(String) result << invoke_on_connection(connection,method_name,worker_options) elsif host_info == :all succeeded = false begin connection.each { |conn| result << invoke_on_connection(conn,method_name,worker_options) } succeeded = true rescue BdrbConnError; end raise NoServerAvailable.new("No BackgrounDRb server is found running") unless succeeded else @tried_connections = [connection.server_info] begin result << invoke_on_connection(connection,method_name,worker_options) rescue BdrbConnError => e connection = middle_man.find_next_except_these(@tried_connections) @tried_connections << connection.server_info retry end end #return nil if method_name == :ask_work process_result(return_result(result)) end
worker_info()
click to toggle source
return runtime information about worker
# File lib/backgroundrb/rails_worker_proxy.rb, line 125 def worker_info t_connections = middle_man.backend_connections result = t_connections.map { |conn| conn.worker_info(compact(:worker => worker_name,:worker_key => worker_key)) } return_result(result) end