class SimpleMapReduce::Server::JobTracker
Constants
- MAX_WORKER_RESERVABLE_SIZE
TODO: be configurable
- POLLING_INTERVAL
Attributes
config[RW]
jobs[R]
workers[R]
Public Class Methods
check_s3_access()
click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 181 def check_s3_access s3_client.list_buckets logger.info('[OK] s3 connection test') end
create_s3_buckets_if_not_existing()
click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 186 def create_s3_buckets_if_not_existing current_bucket_names = s3_client.list_buckets.buckets.map(&:name) unless current_bucket_names.include?(SimpleMapReduce.s3_input_bucket_name) s3_client.create_bucket(bucket: SimpleMapReduce.s3_input_bucket_name) logger.info("create bucket #{SimpleMapReduce.s3_input_bucket_name}") end unless current_bucket_names.include?(SimpleMapReduce.s3_intermediate_bucket_name) s3_client.create_bucket(bucket: SimpleMapReduce.s3_intermediate_bucket_name) logger.info("create bucket #{SimpleMapReduce.s3_intermediate_bucket_name}") end unless current_bucket_names.include?(SimpleMapReduce.s3_output_bucket_name) s3_client.create_bucket(bucket: SimpleMapReduce.s3_output_bucket_name) logger.info("create bucket #{SimpleMapReduce.s3_output_bucket_name}") end logger.info('[OK] confirmed that all necessary s3 buckets exist') end
fetch_available_workers(worker_size = 1)
click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 247 def fetch_available_workers(worker_size = 1) mutex.lock if @workers.nil? || @workers.empty? return [] end ready_workers = @workers.select { |_id, worker| worker.ready? } if ready_workers.count > 0 ready_workers = ready_workers.keys.take(worker_size) ready_workers.map do |retry_worker_id| @workers[retry_worker_id].reserve! rescue => e logger.error("Failed to transit the worker state: `#{@workers[retry_worker_id]}`") logger.error(e.inspect) nil else @workers[retry_worker_id] end.compact else [] end ensure mutex.unlock end
job_manager()
click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 302 def job_manager @job_manager ||= ::Rasteira::EmbedWorker::Manager.run end
logger()
click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 314 def logger SimpleMapReduce.logger end
mutex()
click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 306 def mutex @mutex ||= Mutex.new end
quit!()
click to toggle source
@override `Sinatra::Base#quit!` github.com/sinatra/sinatra/blob/2e980f3534b680fbd79d7ec39552b4afb7675d6c/lib/sinatra/base.rb#L1483-L1491
Calls superclass method
# File lib/simple_map_reduce/server/job_tracker.rb, line 320 def quit! @keep_polling_workers = false @polling_workers_thread&.kill job_manager&.shutdown_workers! super end
register_job(map_script:, map_class_name:, reduce_script:, reduce_class_name:, job_input_bucket_name:, job_input_directory_path:, job_output_bucket_name:, job_output_directory_path:, map_worker:)
click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 205 def register_job(map_script:, map_class_name:, reduce_script:, reduce_class_name:, job_input_bucket_name:, job_input_directory_path:, job_output_bucket_name:, job_output_directory_path:, map_worker:) job = ::SimpleMapReduce::Server::Job.new( map_script: map_script, map_class_name: map_class_name, reduce_script: reduce_script, reduce_class_name: reduce_class_name, job_input_directory_path: job_input_directory_path, job_input_bucket_name: job_input_bucket_name, job_output_bucket_name: job_output_bucket_name, job_output_directory_path: job_output_directory_path, map_worker: map_worker ) if @jobs.nil? @jobs = {} end # enqueue job job_manager.enqueue_job!(SimpleMapReduce::Worker::RegisterMapTaskWorker, args: job) @jobs[job.id] = job job end
register_worker(url:)
click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 237 def register_worker(url:) worker = ::SimpleMapReduce::Server::Worker.new(url: url, data_store_type: 'remote') if @workers.nil? @workers = {} end @workers[worker.id] = worker worker end
s3_client()
click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 310 def s3_client SimpleMapReduce::S3Client.instance.client end
setup_job_tracker()
click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 172 def setup_job_tracker check_s3_access create_s3_buckets_if_not_existing job_manager start_polling_workers logger.info('All setup process is done successfully. The job tracker is operation ready.') logger.info("This job tracker url: #{SimpleMapReduce.job_tracker_url}") end
start_polling_workers()
click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 288 def start_polling_workers @keep_polling_workers = true @polling_workers_thread = Thread.new do loop do break unless @keep_polling_workers job_manager.enqueue_job!(SimpleMapReduce::Worker::PollingWorkersStatusWorker, args: @workers || {}) sleep(POLLING_INTERVAL) end end @polling_workers_thread.run end
store_worker(worker)
click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 274 def store_worker(worker) mutex.lock if @workers.nil? @workers = {} end @workers[worker.id].ready! ensure mutex.unlock end