class SimpleMapReduce::Server::JobTracker

Constants

MAX_WORKER_RESERVABLE_SIZE

TODO: be configurable

POLLING_INTERVAL

Attributes

config[RW]
jobs[R]
workers[R]

Public Class Methods

check_s3_access() click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 181
def check_s3_access
  s3_client.list_buckets
  logger.info('[OK] s3 connection test')
end
create_s3_buckets_if_not_existing() click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 186
def create_s3_buckets_if_not_existing
  current_bucket_names = s3_client.list_buckets.buckets.map(&:name)
  unless current_bucket_names.include?(SimpleMapReduce.s3_input_bucket_name)
    s3_client.create_bucket(bucket: SimpleMapReduce.s3_input_bucket_name)
    logger.info("create bucket #{SimpleMapReduce.s3_input_bucket_name}")
  end

  unless current_bucket_names.include?(SimpleMapReduce.s3_intermediate_bucket_name)
    s3_client.create_bucket(bucket: SimpleMapReduce.s3_intermediate_bucket_name)
    logger.info("create bucket #{SimpleMapReduce.s3_intermediate_bucket_name}")
  end

  unless current_bucket_names.include?(SimpleMapReduce.s3_output_bucket_name)
    s3_client.create_bucket(bucket: SimpleMapReduce.s3_output_bucket_name)
    logger.info("create bucket #{SimpleMapReduce.s3_output_bucket_name}")
  end
  logger.info('[OK] confirmed that all necessary s3 buckets exist')
end
fetch_available_workers(worker_size = 1) click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 247
def fetch_available_workers(worker_size = 1)
  mutex.lock

  if @workers.nil? || @workers.empty?
    return []
  end

  ready_workers = @workers.select { |_id, worker| worker.ready? }
  if ready_workers.count > 0
    ready_workers = ready_workers.keys.take(worker_size)

    ready_workers.map do |retry_worker_id|
      @workers[retry_worker_id].reserve!
    rescue => e
      logger.error("Failed to transit the worker state: `#{@workers[retry_worker_id]}`")
      logger.error(e.inspect)
      nil
    else
      @workers[retry_worker_id]
    end.compact
  else
    []
  end
ensure
  mutex.unlock
end
job_manager() click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 302
def job_manager
  @job_manager ||= ::Rasteira::EmbedWorker::Manager.run
end
logger() click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 314
def logger
  SimpleMapReduce.logger
end
mutex() click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 306
def mutex
  @mutex ||= Mutex.new
end
quit!() click to toggle source

@override `Sinatra::Base#quit!` github.com/sinatra/sinatra/blob/2e980f3534b680fbd79d7ec39552b4afb7675d6c/lib/sinatra/base.rb#L1483-L1491

Calls superclass method
# File lib/simple_map_reduce/server/job_tracker.rb, line 320
def quit!
  @keep_polling_workers = false
  @polling_workers_thread&.kill
  job_manager&.shutdown_workers!
  super
end
register_job(map_script:, map_class_name:, reduce_script:, reduce_class_name:, job_input_bucket_name:, job_input_directory_path:, job_output_bucket_name:, job_output_directory_path:, map_worker:) click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 205
def register_job(map_script:,
                 map_class_name:,
                 reduce_script:,
                 reduce_class_name:,
                 job_input_bucket_name:,
                 job_input_directory_path:,
                 job_output_bucket_name:,
                 job_output_directory_path:,
                 map_worker:)

  job = ::SimpleMapReduce::Server::Job.new(
    map_script: map_script,
    map_class_name: map_class_name,
    reduce_script: reduce_script,
    reduce_class_name: reduce_class_name,
    job_input_directory_path: job_input_directory_path,
    job_input_bucket_name: job_input_bucket_name,
    job_output_bucket_name: job_output_bucket_name,
    job_output_directory_path: job_output_directory_path,
    map_worker: map_worker
  )
  if @jobs.nil?
    @jobs = {}
  end

  # enqueue job
  job_manager.enqueue_job!(SimpleMapReduce::Worker::RegisterMapTaskWorker, args: job)

  @jobs[job.id] = job
  job
end
register_worker(url:) click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 237
def register_worker(url:)
  worker = ::SimpleMapReduce::Server::Worker.new(url: url, data_store_type: 'remote')
  if @workers.nil?
    @workers = {}
  end

  @workers[worker.id] = worker
  worker
end
s3_client() click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 310
def s3_client
  SimpleMapReduce::S3Client.instance.client
end
setup_job_tracker() click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 172
def setup_job_tracker
  check_s3_access
  create_s3_buckets_if_not_existing
  job_manager
  start_polling_workers
  logger.info('All setup process is done successfully. The job tracker is operation ready.')
  logger.info("This job tracker url: #{SimpleMapReduce.job_tracker_url}")
end
start_polling_workers() click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 288
def start_polling_workers
  @keep_polling_workers = true

  @polling_workers_thread = Thread.new do
    loop do
      break unless @keep_polling_workers

      job_manager.enqueue_job!(SimpleMapReduce::Worker::PollingWorkersStatusWorker, args: @workers || {})
      sleep(POLLING_INTERVAL)
    end
  end
  @polling_workers_thread.run
end
store_worker(worker) click to toggle source
# File lib/simple_map_reduce/server/job_tracker.rb, line 274
def store_worker(worker)
  mutex.lock

  if @workers.nil?
    @workers = {}
  end

  @workers[worker.id].ready!
ensure
  mutex.unlock
end