class Embulk::Input::Lkqd

Constants

DEFAULT_COLUMNS

Public Class Methods

guess(config) click to toggle source
# File lib/embulk/input/lkqd.rb, line 68
def self.guess(config)
  return {}
end
guess_column_type(metrics, column_name) click to toggle source
# File lib/embulk/input/lkqd.rb, line 76
def self.guess_column_type(metrics, column_name)
  column_name.gsub!(/^\W/, '')
  column_option = DEFAULT_COLUMNS[column_name]
  if column_option
    return {type: column_option['type'].to_sym, name: column_name}
  else
    return {type: :string, name: column_name}
  end
end
measurable_impressions?(task) click to toggle source
# File lib/embulk/input/lkqd.rb, line 109
def self.measurable_impressions?(task)
  task['measurable_impressions'] && task['viewability_measured_rate_index'] && task['impressions_index']
end
request_lkqd(config) click to toggle source
# File lib/embulk/input/lkqd.rb, line 72
def self.request_lkqd(config)
  ::HTTP.auth("Basic #{config[:authorization]}").post(config[:endpoint], json: config[:report_parameters])
end
resume(task, columns, count) { |task, columns, count| ... } click to toggle source
# File lib/embulk/input/lkqd.rb, line 61
def self.resume(task, columns, count, &control)
  task_reports = yield(task, columns, count)

  next_config_diff = {}
  return next_config_diff
end
transaction(config, &control) click to toggle source
# File lib/embulk/input/lkqd.rb, line 14
def self.transaction(config, &control)
  task = {
    "secret_key_id" => config.param("secret_key_id", :string),
    "secret_key" => config.param("secret_key", :string),
    "endpoint" => config.param("endpoint", :string, default: 'https://api.lkqd.com/reports'),
    "copy_temp_to" => config.param("copy_temp_to", :string, default: nil),
    "measurable_impressions" => config.param("measurable_impressions", :bool, default: false),
    "viewable_impressions" => config.param("viewable_impressions", :bool, default: false),
    "report_parameters" => config.param("report_parameters", :hash, default: {}),
  }
  task['authorization'] = Base64.urlsafe_encode64("#{task['secret_key_id']}:#{task['secret_key']}")

  response = request_lkqd({authorization: task['authorization'], endpoint: task['endpoint'], report_parameters: task['report_parameters']})
  tempfile = Tempfile.new('embulk-input-lkqd_')
  while chunk = response.body.readpartial
    tempfile.write chunk
  end
  tempfile.close
  task['tempfile_path'] = tempfile.path

  FileUtils.cp(task['tempfile_path'], task['copy_temp_to']) if task['copy_temp_to']

  columns = []
  ::CSV.foreach(tempfile.path).first.each_with_index do |column_name, index|
    column_type = guess_column_type(task['report_parameters']['metrics'], column_name)
    column = Column.new({index: index}.merge(column_type))
    if column.name == 'Viewability Measured Rate'
      task['viewability_measured_rate_index'] = index
    elsif column.name == 'Viewability Rate'
      task['viewability_rate_index'] = index
    elsif column.name == 'Impressions'
      task['impressions_index'] = index
    end
    columns << column
  end

  if measurable_impressions?(task)
    columns << Column.new({index: columns.size, type: :double, name: 'Measurable Impressions'})
  end

  if viewable_impressions?(task)
    columns << Column.new({index: columns.size, type: :double, name: 'Viewable Impressions'})
  end

  resume(task, columns, 1, &control)
end
try_convert(row, options={}) click to toggle source
# File lib/embulk/input/lkqd.rb, line 86
def self.try_convert(row, options={})
  return row.map do |field|
    name, value = field
    column_name = name.gsub(/^\W/, '')
    column_option = DEFAULT_COLUMNS[column_name]
    if column_option.nil?
      next value
    #elsif column_option['type'] == 'timestamp'
    #  next Time.strptime(value + " " + options[:timezone], column_option['format'] + " %Z").to_i
    elsif column_option['type'] == 'long'
      next value.gsub(',','').to_i
    elsif column_option['type'] == 'double' && value.match(/%$/) # handle x,xxxx.yy%
      next value.gsub(',','').to_f / 100.0
    elsif column_option['type'] == 'double' && value.match(/^\$[\d\.,]+$/) # handle $x,xxxx.yy%
      next value.gsub(/[$,]/,'').to_f
    elsif column_option['type'] == 'double'
      next value.gsub(',','').to_f
    else
      next value
    end
  end
end
viewable_impressions?(task) click to toggle source
# File lib/embulk/input/lkqd.rb, line 113
def self.viewable_impressions?(task)
  task['viewable_impressions'] && task['viewability_rate_index'] &&
  task['viewability_measured_rate_index'] && task['impressions_index']
end

Public Instance Methods

init() click to toggle source
# File lib/embulk/input/lkqd.rb, line 118
def init
  @task = task
end
run() click to toggle source
# File lib/embulk/input/lkqd.rb, line 122
def run
  convert_options = {timezone: @task['report_parameters']['timezone']}
  viewability_measured_rate_index = @task['viewability_measured_rate_index']
  viewability_rate_index = @task['viewability_rate_index']
  impressions_index = @task['impressions_index']

  CSV.foreach(@task['tempfile_path'], {headers: true}).each do |row|
    row = Lkqd.try_convert(row, convert_options)
    if Lkqd.measurable_impressions?(@task)
      row << row[impressions_index] * row[viewability_measured_rate_index]
    end

    if Lkqd.viewable_impressions?(@task)
      row << row[impressions_index] * row[viewability_measured_rate_index] * row[viewability_rate_index]
    end
    page_builder.add(row)
  end
  page_builder.finish
  FileUtils.rm_rf(@task['tempfile_path'])

  task_report = {}
  return task_report
end