class Cranium::Transformation::Join

Attributes

match_fields[RW]
source_left[RW]
source_right[RW]
target[RW]
type[RW]

Public Instance Methods

execute() click to toggle source
# File lib/cranium/transformation/join.rb, line 9
def execute
  validate_parameters
  cache_commonly_used_values
  join_sources
end

Private Instance Methods

build_join_table_from_file(file) click to toggle source
# File lib/cranium/transformation/join.rb, line 53
def build_join_table_from_file(file)
  line_number = 0
  CSV.foreach File.join(Cranium.configuration.upload_path, file), csv_read_options_for(source_right) do |row|
    next if 1 == (line_number += 1)

    record = Hash[@right_source_field_names.zip row]
    index_key = right_index_key_for record
    if @join_table.has_key? index_key
      @join_table[index_key] << record
    else
      @join_table[index_key] = [record]
    end
  end
end
build_join_table_from_right_source_files() click to toggle source
# File lib/cranium/transformation/join.rb, line 44
def build_join_table_from_right_source_files
  @join_table = {}
  source_right.files.each do |file|
    build_join_table_from_file file
  end
end
cache_commonly_used_values() click to toggle source
# File lib/cranium/transformation/join.rb, line 29
def cache_commonly_used_values
  @left_source_field_names = source_left.fields.keys
  @right_source_field_names = source_right.fields.keys
  @target_field_names = target.fields.keys
end
csv_read_options_for(source_definition) click to toggle source
# File lib/cranium/transformation/join.rb, line 132
def csv_read_options_for(source_definition)
  {
    encoding: source_definition.encoding,
    col_sep: source_definition.delimiter,
    quote_char: source_definition.quote,
    return_headers: false
  }
end
csv_write_options_for(source_definition) click to toggle source
# File lib/cranium/transformation/join.rb, line 121
def csv_write_options_for(source_definition)
  {
    col_sep: source_definition.delimiter,
    quote_char: source_definition.quote,
    write_headers: true,
    headers: source_definition.fields.keys
  }
end
join_sources() click to toggle source
# File lib/cranium/transformation/join.rb, line 37
def join_sources
  build_join_table_from_right_source_files
  write_output_file
end
joined_records_for(record) click to toggle source
# File lib/cranium/transformation/join.rb, line 101
def joined_records_for(record)
  index_key = left_index_key_for record
  return [] unless @join_table.has_key? index_key
  @join_table[index_key].map { |matching_record| record.merge matching_record }
end
left_index_key_for(record) click to toggle source
# File lib/cranium/transformation/join.rb, line 109
def left_index_key_for(record)
  record.select { |field, _| match_fields.values.include? field }.values
end
process_left_source_file(input_file, output_file) click to toggle source
# File lib/cranium/transformation/join.rb, line 83
def process_left_source_file(input_file, output_file)
  line_number = 0
  CSV.foreach input_file, csv_read_options_for(source_left) do |row|
    next if 1 == (line_number += 1)

    record = Hash[@left_source_field_names.zip row]

    joined_records = joined_records_for(record)
    joined_records << record if joined_records.empty? && type == :left

    joined_records.each do |record_to_output|
      output_file << @target_field_names.map { |field| record_to_output[field] }
    end
  end
end
right_index_key_for(record) click to toggle source
# File lib/cranium/transformation/join.rb, line 115
def right_index_key_for(record)
  record.select { |field, _| match_fields.keys.include? field }.values
end
validate_parameters() click to toggle source
# File lib/cranium/transformation/join.rb, line 19
def validate_parameters
  raise "Missing left source for join transformation" if source_left.nil?
  raise "Missing right source for join transformation" if source_right.nil?
  raise "Missing target for join transformation" if target.nil?
  raise "Invalid match fields for join transformation" unless match_fields.nil? or match_fields.is_a? Hash
  raise "Invalid type for join transformation" unless %i(inner left).include?(type)
end
write_output_file() click to toggle source
# File lib/cranium/transformation/join.rb, line 70
def write_output_file
  CSV.open "#{Cranium.configuration.upload_path}/#{target.file}", "w:#{target.encoding}", csv_write_options_for(target) do |target_file|
    @target_file = target_file
    source_left.files.each do |file|
      process_left_source_file File.join(Cranium.configuration.upload_path, file), target_file
    end
  end

  @target.resolve_files
end