class Elasticrawl::ParseJob

Represents an Elastic MapReduce job flow that parses segments of Common Crawl data. A job step is created per segment.

Inherits from Job which is the ActiveRecord model class.

Public Instance Methods

log_uri() click to toggle source

Returns the S3 location for storing Elastic MapReduce job logs.

# File lib/elasticrawl/parse_job.rb, line 39
def log_uri
  s3_path = "/logs/1-parse/#{self.job_name}/"
  build_s3_uri(s3_path)
end
run() click to toggle source

Runs the job by calling Elastic MapReduce API. If successful the parse time is set for each segment.

# File lib/elasticrawl/parse_job.rb, line 20
def run
  emr_config = job_config['emr_config']
  job_flow_id = run_job_flow(emr_config)

  if job_flow_id.present?
    self.job_flow_id = job_flow_id

    self.job_steps.each do |step|
      segment = step.crawl_segment
      segment.parse_time = DateTime.now
      segment.save
    end

    self.save
    self.result_message
  end
end
segment_list() click to toggle source

Return list of segment descriptions.

# File lib/elasticrawl/parse_job.rb, line 45
def segment_list
  segments = ['Segments']

  job_steps.each do |job_step|
    if job_step.crawl_segment.present?
      segment = job_step.crawl_segment
      segments.push(segment.segment_desc)
    end
  end

  segments.push('')
end
set_segments(crawl_segments, max_files = nil) click to toggle source

Populates the job from the list of segments to be parsed.

# File lib/elasticrawl/parse_job.rb, line 8
def set_segments(crawl_segments, max_files = nil)
  self.job_name = set_job_name
  self.job_desc = set_job_desc(crawl_segments, max_files)
  self.max_files = max_files

  crawl_segments.each do |segment|
    self.job_steps.push(create_job_step(segment))
  end
end

Private Instance Methods

create_job_step(segment) click to toggle source

Creates a job step for the crawl segment.

# File lib/elasticrawl/parse_job.rb, line 60
def create_job_step(segment)
  JobStep.create(:job => self,
                 :crawl_segment => segment,
                 :input_paths => segment_input(segment),
                 :output_path => segment_output(segment))
end
job_config() click to toggle source

Returns the parse job configuration from ~/.elasticrawl.jobs.yml.

# File lib/elasticrawl/parse_job.rb, line 93
def job_config
  config = Config.new
  config.load_config('jobs')['steps']['parse']
end
segment_input(segment) click to toggle source

Returns the S3 location for reading a crawl segment. The input filter determines which type of Common Crawl data files are parsed.

# File lib/elasticrawl/parse_job.rb, line 69
def segment_input(segment)
  segment.segment_s3_uri + job_config['input_filter']
end
segment_output(segment) click to toggle source

Returns the S3 location for storing the step results. This includes the segment name.

# File lib/elasticrawl/parse_job.rb, line 75
def segment_output(segment)
  job_path = "/data/1-parse/#{self.job_name}"
  s3_path = "#{job_path}/segments/#{segment.segment_name}/"
  build_s3_uri(s3_path)
end
set_job_desc(segments, max_files) click to toggle source

Sets the job description which forms part of the Elastic MapReduce job flow name.

# File lib/elasticrawl/parse_job.rb, line 83
def set_job_desc(segments, max_files)
  if segments.count > 0
    crawl_name = segments[0].crawl.crawl_name if segments[0].crawl.present?
    file_desc = max_files.nil? ? 'all files' : "#{max_files} files per segment"
  end

  "Crawl: #{crawl_name} Segments: #{segments.count} Parsing: #{file_desc}"
end