class Naginegi::Embulk

Public Class Methods

new(log_level, embulk_run_option) click to toggle source
# File lib/naginegi/embulk.rb, line 5
def initialize(log_level, embulk_run_option)
  @logger = Logger.new(STDOUT)
  @logger.datetime_format = '%Y-%m-%d %H:%M:%S'

  @log_level = log_level
  @embulk_run_option = embulk_run_option
end

Public Instance Methods

run(db_configs, all_table_configs, bq_config, target_table_names = []) click to toggle source
# File lib/naginegi/embulk.rb, line 13
def run(db_configs, all_table_configs, bq_config, target_table_names = [])
  error_tables = []
  db_configs.keys.each do |db_name|
    table_configs = select_table_configs(all_table_configs[db_name], target_table_names)
    error_tables += run_by_database(
      db_name,
      table_configs,
      bq_config,
      db_configs[db_name]['bq_dataset']
    )
  end
  error_tables
end
select_table_configs(table_configs, target_table_names) click to toggle source
# File lib/naginegi/embulk.rb, line 27
def select_table_configs(table_configs, target_table_names)
  return table_configs if target_table_names.empty?
  table_configs.select { |table_config| target_table_names.include?(table_config.name) }
end

Private Instance Methods

run_by_database(db_name, table_configs, bq_config, bq_dataset) click to toggle source
# File lib/naginegi/embulk.rb, line 34
def run_by_database(db_name, table_configs, bq_config, bq_dataset)
  process_times = []
  error_tables = []

  bq_utility = Naginegi::BigQuery.new(bq_config)

  table_configs.each do |table_config|
    start_time = Time.now
    @logger.info("table: #{table_config.name} - start")

    begin
      bq_utility.delete_table(bq_dataset, table_config.name)
      @logger.info("#{table_config.name} is deleted")
    rescue => e
      @logger.warn(e.message)
    end

    cmd = "embulk run #{@embulk_run_option} #{bq_config['config_dir']}/#{db_name}/#{table_config.name}.yml --log-level #{@log_level}"
    @logger.info("cmd: #{cmd}")

    if system(cmd)
      result = 'success'
    else
      result = 'error'
      error_tables << table_config.name
    end

    process_time = "table: #{table_config.name} - result: #{result}  #{format('%10.1f', Time.now - start_time)}sec"
    @logger.info(process_time)

    process_times << process_time
  end

  @logger.info('------------------------------------')
  @logger.info("db_name: #{db_name}")

  process_times.each { |process_time| @logger.info(process_time) }

  error_tables
end