class Noda::JobWorker
ジョブワーカー
ジョブを待ち受けるスレッドです。 Taskを取りだして実行します.
ip=127.0.0.1 w=Noda::JobWorker.new("#{ip}", "10001") t = DRb.start_service("druby://#{ip}:10101",w) w.start
Taskをサーバー経由で送信する
ip=127.0.0.1 server = Noda::JobServer.new ip,"10001" str = %Q' class Noda::MyTask def do_task(table) table.put @name, "#{Process.pid} : #{Time.now}" return "#{@name} in #{Process.pid} : #{Time.now}" end def initialize(name) @name end end ' eval(str) task = Noda::MyTask.new("test") server.add_task_class( task.class.to_s, str) 10.times{|i| server.input.push Noda::MyTask.new(i) }
Attributes
max_retry_connect[RW]
thread[R]
wait_time_to_retry[RW]
Public Class Methods
new( server_addr="localhost",server_port="10001",q="" )
click to toggle source
-
server_addr ジョブサーバーアドレス、またはホスト名
-
server_port ジョブサーバーポート
# File lib/noda/job_worker.rb, line 34 def initialize( server_addr="localhost",server_port="10001",q="" ) @server_uri = "druby://#{server_addr}:#{server_port}" @max_retry_connect = 30 @wait_time_to_retry = 2 require "socket" @local_addr = IPSocket::getaddress(Socket::gethostname) self.connect self end
Public Instance Methods
connect()
click to toggle source
サーバーに接続します
# File lib/noda/job_worker.rb, line 92 def connect self.connect_job_server end
connect_job_server()
click to toggle source
内部的に使います。ジョブサーバーへ接続
# File lib/noda/job_worker.rb, line 44 def connect_job_server error_conter = 0 begin @job =DRbObject.new_with_uri(@server_uri) @job.hash_table @logger = @job.logger rescue DRb::DRbConnError => e error_conter +=1 raise e if error_conter > @max_retry_connect sleep @wait_time_to_retry retry end end
handle_task()
click to toggle source
担当ジョブからタスクを実行します.
タスクは do_task(hash)実装が必須 タスクのクラス定義はrequire必須.(start前にrequire) タスクのクラス定義はサーバー側から自動ロード(eval)します.
# File lib/noda/job_worker.rb, line 62 def handle_task() # @logger.info("self.class@#{@local_addr}#{self.object_id}"){"i try to pop a task."} task = @job.input.pop if task.class == DRb::DRbUnknown self.load_class(task.name) task = task.reload end result = task.do_task(@job.hash_table) @job.output.push result end
init_thread()
click to toggle source
ワーカーのメインスレッドを起動します.start で使います.
# File lib/noda/job_worker.rb, line 80 def init_thread @table = @job.hash_table @thread= Thread.new{ loop{ self.handle_task() sleep 0.001 } } end
load_class(name)
click to toggle source
クラス定義をEvalする。クラス定義はサーバーから取り出す. ワーカー側にクラス定義を動的に渡すときに使います. *name クラス名
# File lib/noda/job_worker.rb, line 75 def load_class(name) s = @job.task_class(name) Noda.module_eval(s) if s end
start()
click to toggle source
処理を開始します.
threadを返します. worker を起動しっぱなしにするなら thread.joinしてください
# File lib/noda/job_worker.rb, line 99 def start self.init_thread @thread.join end
status()
click to toggle source
ワーカースレッドの状態を取り出します.
マルチスレッドでブロックされてるとsleep になります
# File lib/noda/job_worker.rb, line 107 def status @thread.status if @thread end
stop()
click to toggle source
スレッド停止します.このインスタンスは死にません.start で再起動します.
# File lib/noda/job_worker.rb, line 112 def stop @thread.kill end