class Fluent::Plugin::GCSOutput

Constants

DEFAULT_FORMAT_TYPE
MAX_HEX_RANDOM_LENGTH

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcs.rb, line 15
def initialize
  super
  require "google/cloud/storage"
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcs.rb, line 72
def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter, :inject)
  super

  if @hex_random_length > MAX_HEX_RANDOM_LENGTH
    raise Fluent::ConfigError, "hex_random_length parameter should be set to #{MAX_HEX_RANDOM_LENGTH} characters or less."
  end

  # The customer-supplied, AES-256 encryption key that will be used to encrypt the file.
  @encryption_opts = {
    encryption_key: @encryption_key,
  }

  if @object_metadata
    @object_metadata_hash = @object_metadata.map {|m| [m.key, m.value] }.to_h
  end

  @formatter = formatter_create

  @object_creator = Fluent::GCS.discovered_object_creator(@store_as, transcoding: @transcoding)
  # For backward compatibility
  # TODO: Remove time_slice_format when end of support compat_parameters
  @configured_time_slice_format = conf['time_slice_format']
  @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']))
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 111
def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 116
def multi_workers_ready?
  true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcs.rb, line 98
def start
  @gcs = Google::Cloud::Storage.new(
    project: @project,
    keyfile: @keyfile,
    retries: @client_retries,
    timeout: @client_timeout
  )
  @gcs_bucket = @gcs.bucket(@bucket)

  ensure_bucket
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 120
def write(chunk)
  path = generate_path(chunk)

  @object_creator.create(chunk) do |obj|
    opts = {
      metadata: @object_metadata_hash,
      acl: @acl,
      storage_class: @storage_class,
      content_type: @object_creator.content_type,
      content_encoding: @object_creator.content_encoding,
    }
    opts.merge!(@encryption_opts)

    log.debug { "out_gcs: upload chunk:#{chunk.key} to gcs://#{@bucket}/#{path} options: #{opts}" }
    @gcs_bucket.upload_file(obj.path, path, opts)
  end
end

Private Instance Methods

check_object_exists(path) click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 154
def check_object_exists(path)
  if !@blind_write
    return @gcs_bucket.find_file(path, @encryption_opts)
  else
    return false
  end
end
ensure_bucket() click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 140
def ensure_bucket
  return unless @gcs_bucket.nil?

  if !@auto_create_bucket
    raise "bucket `#{@bucket}` does not exist"
  end
  log.info "creating bucket `#{@bucket}`"
  @gcs_bucket = @gcs.create_bucket(@bucket)
end
generate_path(chunk, i = 0, prev = nil) click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 162
def generate_path(chunk, i = 0, prev = nil)
  metadata = chunk.metadata
  time_slice = if metadata.timekey.nil?
                 ''.freeze
               else
                 @time_slice_with_tz.call(metadata.timekey)
               end
  tags = {
    "%{file_extension}" => @object_creator.file_extension,
    "%{hex_random}" => hex_random(chunk),
    "%{hostname}" => Socket.gethostname,
    "%{index}" => i,
    "%{path}" => @path,
    "%{time_slice}" => time_slice,
    "%{uuid_flush}" => SecureRandom.uuid,
  }
  path = @object_key_format.gsub(Regexp.union(tags.keys), tags)
  path = extract_placeholders(path, chunk)
  return path unless check_object_exists(path)

  if path == prev
    if @overwrite
      log.warn "object `#{path}` already exists but overwrites it"
      return path
    end
    raise "object `#{path}` already exists"
  end
  generate_path(chunk, i + 1, path)
end
hex_random(chunk) click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 150
def hex_random(chunk)
  Digest::MD5.hexdigest(chunk.unique_id)[0...@hex_random_length]
end
timekey_to_timeformat(timekey) click to toggle source

This is stolen from Fluentd

# File lib/fluent/plugin/out_gcs.rb, line 193
def timekey_to_timeformat(timekey)
  case timekey
  when nil          then ''
  when 0...60       then '%Y%m%d%H%M%S' # 60 exclusive
  when 60...3600    then '%Y%m%d%H%M'
  when 3600...86400 then '%Y%m%d%H'
  else                   '%Y%m%d'
  end
end