class Fluent::Plugin::BigQueryBaseOutput

This class is abstract class

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigquery_base.rb, line 84
def configure(conf)
  super

  case @auth_method
  when :private_key
    unless @email && @private_key_path
      raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'"
    end
  when :compute_engine
    # Do nothing
  when :json_key
    unless @json_key
      raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'"
    end
  when :application_default
    # Do nothing
  else
    raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}"
  end

  unless @table.nil? ^ @tables.nil?
    raise Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid"
  end

  @tablelist = @tables ? @tables : [@table]

  @table_schema = Fluent::BigQuery::RecordSchema.new('record')
  if @schema
    @table_schema.load_schema(@schema)
  end
  if @schema_path
    @table_schema.load_schema(MultiJson.load(File.read(@schema_path)))
  end

  formatter_config = conf.elements("format")[0]
  @formatter = formatter_create(usage: 'out_bigquery_for_insert', default_type: 'json', conf: formatter_config)
end
fetch_schema(metadata) click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 181
def fetch_schema(metadata)
  table_id = nil
  project = extract_placeholders(@project, metadata)
  dataset = extract_placeholders(@dataset, metadata)
  table_id = fetch_schema_target_table(metadata)

  if Fluent::Engine.now - @last_fetch_schema_time["#{project}.#{dataset}.#{table_id}"] > @schema_cache_expire
    schema = writer.fetch_schema(project, dataset, table_id)

    if schema
      table_schema = Fluent::BigQuery::RecordSchema.new("record")
      table_schema.load_schema(schema)
      @fetched_schemas["#{project}.#{dataset}.#{table_id}"] = table_schema
    else
      if @fetched_schemas["#{project}.#{dataset}.#{table_id}"].nil?
        raise "failed to fetch schema from bigquery"
      else
        log.warn "#{table_id} uses previous schema"
      end
    end

    @last_fetch_schema_time["#{project}.#{dataset}.#{table_id}"] = Fluent::Engine.now
  end

  @fetched_schemas["#{project}.#{dataset}.#{table_id}"]
end
fetch_schema_target_table(metadata) click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 208
def fetch_schema_target_table(metadata)
  extract_placeholders(@fetch_schema_table || @tablelist[0], metadata)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 157
def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)

  meta = metadata(tag, time, record)
  schema =
    if @fetch_schema
      fetch_schema(meta)
    else
      @table_schema
    end

  begin
    row = schema.format(record)
    return if row.empty?
    @formatter.format(tag, time, row)
  rescue
    log.error("format error", record: record, schema: schema)
    raise
  end
end
get_schema(project, dataset, metadata) click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 212
def get_schema(project, dataset, metadata)
  if @fetch_schema
    @fetched_schemas["#{project}.#{dataset}.#{fetch_schema_target_table(metadata)}"] || fetch_schema(metadata)
  else
    @table_schema
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 131
def multi_workers_ready?
  true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_bigquery_base.rb, line 122
def start
  super

  @tables_queue = @tablelist.shuffle
  @tables_mutex = Mutex.new
  @fetched_schemas = {}
  @last_fetch_schema_time = Hash.new(0)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 178
def write(chunk)
end
writer() click to toggle source
# File lib/fluent/plugin/out_bigquery_base.rb, line 135
def writer
  @writer ||= Fluent::BigQuery::Writer.new(@log, @auth_method, {
    private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase,
    email: @email,
    json_key: @json_key,
    location: @location,
    source_format: @source_format,
    skip_invalid_rows: @skip_invalid_rows,
    ignore_unknown_values: @ignore_unknown_values,
    max_bad_records: @max_bad_records,
    allow_retry_insert_errors: @allow_retry_insert_errors,
    prevent_duplicate_load: @prevent_duplicate_load,
    auto_create_table: @auto_create_table,
    time_partitioning_type: @time_partitioning_type,
    time_partitioning_field: @time_partitioning_field,
    time_partitioning_expiration: @time_partitioning_expiration,
    clustering_fields: @clustering_fields,
    timeout_sec: @request_timeout_sec,
    open_timeout_sec: @request_open_timeout_sec,
  })
end