class Fluent::GCSOutput

Constants

MAX_HEX_RANDOM_LENGTH

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcs.rb, line 12
def initialize
  super
  require "google/cloud/storage"
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcs.rb, line 56
def configure(conf)
  super

  if @hex_random_length > MAX_HEX_RANDOM_LENGTH
    raise Fluent::ConfigError, "hex_random_length parameter should be set to #{MAX_HEX_RANDOM_LENGTH} characters or less."
  end

  # The customer-supplied, AES-256 encryption key that will be used to encrypt the file.
  @encryption_opts = {
    encryption_key: @encryption_key,
  }

  if @object_metadata
    @object_metadata_hash = @object_metadata.map {|m| [m.key, m.value] }.to_h
  end

  @formatter = Fluent::Plugin.new_formatter(@format)
  @formatter.configure(conf)

  @object_creator = Fluent::GCS.discovered_object_creator(@store_as, transcoding: @transcoding)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 91
def format(tag, time, record)
  @formatter.format(tag, time, record)
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcs.rb, line 78
def start
  @gcs = Google::Cloud::Storage.new(
    project: @project,
    keyfile: @keyfile,
    retries: @client_retries,
    timeout: @client_timeout
  )
  @gcs_bucket = @gcs.bucket(@bucket)

  ensure_bucket
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 95
def write(chunk)
  path = generate_path(chunk)

  @object_creator.create(chunk) do |obj|
    opts = {
      metadata: @object_metadata_hash,
      acl: @acl,
      storage_class: @storage_class,
      content_type: @object_creator.content_type,
      content_encoding: @object_creator.content_encoding,
    }
    opts.merge!(@encryption_opts)

    log.debug { "out_gcs: upload chunk:#{chunk.key} to gcs://#{@bucket}/#{path} options: #{opts}" }
    @gcs_bucket.upload_file(obj.path, path, opts)
  end
end

Private Instance Methods

ensure_bucket() click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 115
def ensure_bucket
  return unless @gcs_bucket.nil?

  if !@auto_create_bucket
    raise "bucket `#{@bucket}` does not exist"
  end
  log.info "creating bucket `#{@bucket}`"
  @gcs_bucket = @gcs.create_bucket(@bucket)
end
format_path(chunk) click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 129
def format_path(chunk)
  now = Time.strptime(chunk.key, @time_slice_format)
  (@localtime ? now : now.utc).strftime(@path)
end
generate_path(chunk, i = 0, prev = nil) click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 134
def generate_path(chunk, i = 0, prev = nil)
  tags = {
    "%{file_extension}" => @object_creator.file_extension,
    "%{hex_random}" => hex_random(chunk),
    "%{hostname}" => Socket.gethostname,
    "%{index}" => i,
    "%{path}" => format_path(chunk),
    "%{time_slice}" => chunk.key,
    "%{uuid_flush}" => SecureRandom.uuid,
  }
  path = @object_key_format.gsub(Regexp.union(tags.keys), tags)
  return path unless @gcs_bucket.find_file(path, @encryption_opts)

  if path == prev
    if @overwrite
      log.warn "object `#{path}` already exists but overwrites it"
      return path
    end
    raise "object `#{path}` already exists"
  end
  generate_path(chunk, i + 1, path)
end
hex_random(chunk) click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 125
def hex_random(chunk)
  Digest::MD5.hexdigest(chunk.unique_id)[0...@hex_random_length]
end