class Embulk::Input::GoogleAnalytics::Plugin

Constants

AUTH_TYPE_JSON_KEY
AUTH_TYPE_REFRESH_TOKEN

Public Class Methods

canonicalize_column_name(name) click to toggle source
# File lib/embulk/input/google_analytics/plugin.rb, line 103
def self.canonicalize_column_name(name)
  # ga:dateHour -> date_hour
  name.gsub(/^ga:/, "").gsub(/[A-Z]+/, "_\\0").gsub(/^_/, "").downcase
end
columns_from_task(task) click to toggle source
# File lib/embulk/input/google_analytics/plugin.rb, line 95
def self.columns_from_task(task)
  [
    task["time_series"],
    task["dimensions"],
    task["metrics"],
  ].flatten.uniq
end
guess(config) click to toggle source
# File lib/embulk/input/google_analytics/plugin.rb, line 108
def self.guess(config)
  Embulk.logger.warn "Don't needed to guess for this plugin"
  return {}
end
resume(task, columns, count) { |task, columns, count| ... } click to toggle source
# File lib/embulk/input/google_analytics/plugin.rb, line 63
def self.resume(task, columns, count, &control)
  task_reports = yield(task, columns, count)

  next_config_diff = task_reports.first
  return next_config_diff
end
task_from_config(config) click to toggle source
# File lib/embulk/input/google_analytics/plugin.rb, line 70
def self.task_from_config(config)
  refresh_token = config.param('refresh_token', :string, default: nil)
  json_key_content = config.param("json_key_content", :string, default: nil)

  auth_method = Plugin::AUTH_TYPE_REFRESH_TOKEN if refresh_token
  auth_method = Plugin::AUTH_TYPE_JSON_KEY if json_key_content && auth_method == nil
    {
    "auth_method" => auth_method,
    "client_id" => config.param("client_id", :string, default: nil),
    "client_secret" => config.param("client_secret", :string, default: nil),
    "refresh_token" => refresh_token,
    "json_key_content" => json_key_content,
    "view_id" => config.param("view_id", :string),
    "dimensions" => config.param("dimensions", :array, default: []),
    "metrics" => config.param("metrics", :array, default: []),
    "time_series" => config.param("time_series", :string),
    "start_date" => config.param("start_date", :string, default: nil),
    "end_date" => config.param("end_date", :string, default: nil),
    "incremental" => config.param("incremental", :bool, default: true),
    "last_record_time" => config.param("last_record_time", :string, default: nil),
    "retry_limit" => config.param("retry_limit", :integer, default: 5),
    "retry_initial_wait_sec" => config.param("retry_initial_wait_sec", :integer, default: 2),
  }
end
transaction(config, &control) click to toggle source

developers.google.com/analytics/devguides/reporting/core/dimsmets

# File lib/embulk/input/google_analytics/plugin.rb, line 11
def self.transaction(config, &control)
  task = task_from_config(config)
  unless %w(ga:date ga:dateHour).include?(task["time_series"])
    raise ConfigError.new("Unknown time_series '#{task["time_series"]}'. Use 'ga:dateHour' or 'ga:date'")
  end

  raise ConfigError.new("Unknown Authentication method '#{task['auth_method']}'.") unless task['auth_method']

  if task['auth_method'] == Plugin::AUTH_TYPE_REFRESH_TOKEN
    unless task['client_id'] && task['client_secret'] && task['refresh_token']
      raise ConfigError.new("client_id, client_secret and refresh_token are required when using Oauth authentication")
    end
  elsif task['auth_method'] == Plugin::AUTH_TYPE_JSON_KEY
    if !valid_json?(task["json_key_content"])
      raise ConfigError.new("json_key_content is not a valid JSON object")
    end
  end

  columns_list = Client.new(task).get_columns_list

  columns = columns_from_task(task).map do |col_name|
    col_info = columns_list.find{|col| col[:id] == col_name}
    raise ConfigError.new("Unknown metric/dimension '#{col_name}'") unless col_info

    col_type =
      if col_info[:attributes]
        # standard dimension
        case col_info[:attributes][:dataType]
        when "STRING"
          :string
        when "INTEGER"
          :long
        when "PERCENT", "FLOAT", "CURRENCY", "TIME"
          :double
        end
      else
        # custom dimension
        :string
      end

    # time_series column should be timestamp
    if col_name == task["time_series"]
      col_type = :timestamp
    end
    Column.new(nil, canonicalize_column_name(col_name), col_type)
  end

  columns << Column.new(nil, "view_id", :string)

  resume(task, columns, 1, &control)
end
valid_json?(json_object) click to toggle source
# File lib/embulk/input/google_analytics/plugin.rb, line 113
def self.valid_json?(json_object)
  # 'null' string is a valid string for parse function
  # However in our case, json_content_key could not be 'null' therefore this check is added
  if json_object == "null"
      return false
  end
  begin
    JSON.parse(json_object)
        return true
  rescue JSON::ParserError => e
      return false
  end
end

Public Instance Methods

calculate_next_times(client_time_zone, fetched_latest_time) click to toggle source
# File lib/embulk/input/google_analytics/plugin.rb, line 177
def calculate_next_times(client_time_zone, fetched_latest_time)
  task_report = {}
  if fetched_latest_time
    # Convert fetched_last_time to user timezone
    timezone = ActiveSupport::TimeZone[client_time_zone]
    task_report[:start_date] = timezone.nil? ? fetched_latest_time.strftime("%Y-%m-%d") : timezone.parse(fetched_latest_time.to_s).strftime("%Y-%m-%d")

    # if end_date specified as statically YYYY-MM-DD, it will be conflict with start_date (end_date < start_date)
    # Or when end_date is nil, only start_date will be filled on next run but it is illegal API request.
    # Modify end_date as "today" to be safe
    if task["end_date"].nil? || task["end_date"].match(/[0-9]{4}-[0-9]{2}-[0-9]{2}/)
      task_report[:end_date] = "today" # "today" means now. running at 03:30 AM, will got 3 o'clock data.
    else
      task_report[:end_date] = task["end_date"]
    end

    # "start_date" format is YYYY-MM-DD, but ga:dateHour will return records by hourly.
    # If run at 2016-07-03 05:00:00, start_date will set "2016-07-03" and got records until 2016-07-03 05:00:00.
    # Then next run at 2016-07-04 05:00, will got records between 2016-07-03 00:00:00 and 2016-07-04 05:00:00.
    # It will evantually duplicated between 2016-07-03 00:00:00 and 2016-07-03 05:00:00
    #
    #           Date|        2016-07-03      |   2016-07-04
    #           Hour|    5                   |    5
    # 1st run ------|----|                   |
    # 2nd run       |------------------------|-----
    #               ^^^^^ duplicated
    #
    # "last_record_time" option solves that problem
    #
    #           Date|        2016-07-03      |   2016-07-04
    #           Hour|    5                   |    5
    # 1st run ------|----|                   |
    # 2nd run       #####|-------------------|-----
    #               ^^^^^ ignored (skipped)
    #
    task_report[:last_record_time] = fetched_latest_time.strftime("%Y-%m-%d %H:%M:%S %z")
  else
    # no records fetched, don't modify config_diff
    task_report = {
      start_date: task["start_date"],
      end_date: task["end_date"]
    }
    # write last_record_time only when last_record_time is not nil and not empty
    unless task["last_record_time"].blank?
      task_report[:last_record_time] = task["last_record_time"]
    end 
  end
  task_report
end
init() click to toggle source
# File lib/embulk/input/google_analytics/plugin.rb, line 127
def init
  if task["start_date"] && !task["end_date"]
    task["end_date"] = "today"
  end
end
preview?() click to toggle source
# File lib/embulk/input/google_analytics/plugin.rb, line 171
def preview?
  org.embulk.spi.Exec.isPreview()
rescue java.lang.NullPointerException
  false
end
run() click to toggle source
# File lib/embulk/input/google_analytics/plugin.rb, line 133
def run
  client = Client.new(task, preview?)
  columns = self.class.columns_from_task(task) + ["view_id"]

  last_record_time = Time.parse(task["last_record_time"]) if task['incremental'] && !task["last_record_time"].blank?
  latest_time_series = nil
  skip_counter, total_counter = 0, 0
  client.each_report_row do |row|
    time = row[task["time_series"]]
    total_counter += 1
    if !preview? && last_record_time && time <= last_record_time
      skip_counter += 1
      next
    end

    values = row.values_at(*columns)
    page_builder.add values

    latest_time_series = [
      latest_time_series,
      time,
    ].compact.max
  end
  page_builder.finish

  Embulk.logger.info "Total: #{total_counter} rows."
  if skip_counter > 0
    Embulk.logger.info "#{skip_counter} rows were ignored because the rows' date is " +
                           "before \"last_record_time\": #{last_record_time}."
  end

  if task["incremental"]
    calculate_next_times(client.get_profile[:timezone], latest_time_series)
  else
    {}
  end
end