class Noda::JobWorker

ジョブワーカー

ジョブを待ち受けるスレッドです。 Taskを取りだして実行します.

ip=127.0.0.1
w=Noda::JobWorker.new("#{ip}", "10001")
t = DRb.start_service("druby://#{ip}:10101",w)
w.start

Taskをサーバー経由で送信する

ip=127.0.0.1
server = Noda::JobServer.new ip,"10001"
str = %Q'
       class Noda::MyTask
         def do_task(table)
            table.put @name, "#{Process.pid} : #{Time.now}"
            return "#{@name} in #{Process.pid} : #{Time.now}"
         end
         def initialize(name) @name end
       end
'
eval(str)
task = Noda::MyTask.new("test")
server.add_task_class( task.class.to_s, str)
10.times{|i| server.input.push Noda::MyTask.new(i) }

Attributes

max_retry_connect[RW]
thread[R]
wait_time_to_retry[RW]

Public Class Methods

new( server_addr="localhost",server_port="10001",q="" ) click to toggle source
  • server_addr ジョブサーバーアドレス、またはホスト名

  • server_port ジョブサーバーポート

# File lib/noda/job_worker.rb, line 34
def initialize( server_addr="localhost",server_port="10001",q="" )
  @server_uri  = "druby://#{server_addr}:#{server_port}"
  @max_retry_connect  = 30
  @wait_time_to_retry =  2
  require "socket" 
  @local_addr = IPSocket::getaddress(Socket::gethostname)
  self.connect
  self
end

Public Instance Methods

connect() click to toggle source

サーバーに接続します

# File lib/noda/job_worker.rb, line 92
def connect
  self.connect_job_server
end
connect_job_server() click to toggle source

内部的に使います。ジョブサーバーへ接続

# File lib/noda/job_worker.rb, line 44
def connect_job_server
  error_conter = 0
  begin 
    @job =DRbObject.new_with_uri(@server_uri)
    @job.hash_table
    @logger = @job.logger
  rescue DRb::DRbConnError => e
    error_conter +=1
    raise e if error_conter > @max_retry_connect
    sleep @wait_time_to_retry 
    retry
  end
end
handle_task() click to toggle source

担当ジョブからタスクを実行します.

タスクは do_task(hash)実装が必須 タスクのクラス定義はrequire必須.(start前にrequire) タスクのクラス定義はサーバー側から自動ロード(eval)します.

# File lib/noda/job_worker.rb, line 62
def handle_task()
  # @logger.info("self.class@#{@local_addr}#{self.object_id}"){"i try to pop a task."}
  task = @job.input.pop
  if task.class ==  DRb::DRbUnknown
    self.load_class(task.name)
    task = task.reload
  end
  result = task.do_task(@job.hash_table)
  @job.output.push result
end
init_thread() click to toggle source

ワーカーのメインスレッドを起動します.start で使います.

# File lib/noda/job_worker.rb, line 80
def init_thread
  @table = @job.hash_table
  @thread= Thread.new{
      loop{
        self.handle_task()
        sleep 0.001
      }
  }
end
load_class(name) click to toggle source

クラス定義をEvalする。クラス定義はサーバーから取り出す. ワーカー側にクラス定義を動的に渡すときに使います. *name クラス名

# File lib/noda/job_worker.rb, line 75
def load_class(name)
  s = @job.task_class(name)
  Noda.module_eval(s) if s
end
start() click to toggle source

処理を開始します.

threadを返します. worker を起動しっぱなしにするなら thread.joinしてください

# File lib/noda/job_worker.rb, line 99
def start
  self.init_thread
  @thread.join
end
status() click to toggle source

ワーカースレッドの状態を取り出します.

マルチスレッドでブロックされてるとsleep になります

# File lib/noda/job_worker.rb, line 107
def status
  @thread.status if @thread
end
stop() click to toggle source

スレッド停止します.このインスタンスは死にません.start で再起動します.

# File lib/noda/job_worker.rb, line 112
def stop
  @thread.kill
end