class Fluent::Plugin::S3Output

Constants

COMPRESSOR_REGISTRY
DEFAULT_FORMAT_TYPE
MAX_HEX_RANDOM_LENGTH

Attributes

bucket[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_s3.rb, line 16
def initialize
  super
  @compressor = nil
  @uuid_flush_enabled = false
end

Private Class Methods

register_compressor(name, compressor) click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 638
def self.register_compressor(name, compressor)
  COMPRESSOR_REGISTRY.register(name, compressor)
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_s3.rb, line 176
def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter, :inject)

  super

  Aws.use_bundled_cert! if @use_bundled_cert

  if @s3_endpoint && (@s3_endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @s3_endpoint.include?(e) })
    raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
  end

  begin
    buffer_type = @buffer_config[:@type]
    @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: buffer_type, log: log)
  rescue => e
    log.warn "'#{@store_as}' not supported. Use 'text' instead: error = #{e.message}"
    @compressor = TextCompressor.new
  end
  @compressor.configure(conf)

  @formatter = formatter_create

  if @hex_random_length > MAX_HEX_RANDOM_LENGTH
    raise Fluent::ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}"
  end

  unless @index_format =~ /^%(0\d*)?[dxX]$/
    raise Fluent::ConfigError, "index_format parameter should follow `%[flags][width]type`. `0` is the only supported flag, and is mandatory if width is specified. `d`, `x` and `X` are supported types"
  end

  if @reduced_redundancy
    log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead"
    @storage_class = "REDUCED_REDUNDANCY"
  end

  @s3_object_key_format = process_s3_object_key_format
  if !@check_object
    if conf.has_key?('s3_object_key_format')
      log.warn "Set 'check_object false' and s3_object_key_format is specified. Check s3_object_key_format is unique in each write. If not, existing file will be overwritten."
    else
      log.warn "Set 'check_object false' and s3_object_key_format is not specified. Use '%{path}/%{time_slice}_%{hms_slice}.%{file_extension}' for s3_object_key_format"
      @s3_object_key_format = "%{path}/%{time_slice}_%{hms_slice}.%{file_extension}"
    end
  end

  check_s3_path_safety(conf)

  # For backward compatibility
  # TODO: Remove time_slice_format when end of support compat_parameters
  @configured_time_slice_format = conf['time_slice_format']
  @values_for_s3_object_chunk = {}
  @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']))
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 261
def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 230
def multi_workers_ready?
  true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_s3.rb, line 234
def start
  options = setup_credentials
  options[:region] = @s3_region if @s3_region
  options[:endpoint] = @s3_endpoint if @s3_endpoint
  options[:use_accelerate_endpoint] = @enable_transfer_acceleration
  options[:use_dualstack_endpoint] = @enable_dual_stack
  options[:http_proxy] = @proxy_uri if @proxy_uri
  options[:force_path_style] = @force_path_style
  options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil?
  options[:signature_version] = @signature_version unless @signature_version.nil?
  options[:ssl_verify_peer] = @ssl_verify_peer
  log.on_trace do
    options[:http_wire_trace] = true
    options[:logger] = log
  end

  s3_client = Aws::S3::Client.new(options)
  @s3 = Aws::S3::Resource.new(client: s3_client)
  @bucket = @s3.bucket(@s3_bucket)

  check_apikeys if @check_apikey_on_start
  ensure_bucket if @check_bucket
  ensure_bucket_lifecycle

  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 266
def write(chunk)
  i = 0
  metadata = chunk.metadata
  previous_path = nil
  time_slice = if metadata.timekey.nil?
                 ''.freeze
               else
                 @time_slice_with_tz.call(metadata.timekey)
               end

  if @check_object
    begin
      @values_for_s3_object_chunk[chunk.unique_id] ||= {
        "%{hex_random}" => hex_random(chunk),
      }
      values_for_s3_object_key_pre = {
        "%{path}" => @path,
        "%{file_extension}" => @compressor.ext,
      }
      values_for_s3_object_key_post = {
        "%{time_slice}" => time_slice,
        "%{index}" => sprintf(@index_format,i),
      }.merge!(@values_for_s3_object_chunk[chunk.unique_id])
      values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled

      s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) do |matched_key|
        values_for_s3_object_key_pre.fetch(matched_key, matched_key)
      end
      s3path = extract_placeholders(s3path, chunk)
      s3path = s3path.gsub(%r(%{[^}]+}), values_for_s3_object_key_post)
      if (i > 0) && (s3path == previous_path)
        if @overwrite
          log.warn "#{s3path} already exists, but will overwrite"
          break
        else
          raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
        end
      end

      i += 1
      previous_path = s3path
    end while @bucket.object(s3path).exists?
  else
    if @localtime
      hms_slicer = Time.now.strftime("%H%M%S")
    else
      hms_slicer = Time.now.utc.strftime("%H%M%S")
    end

    @values_for_s3_object_chunk[chunk.unique_id] ||= {
      "%{hex_random}" => hex_random(chunk),
    }
    values_for_s3_object_key_pre = {
      "%{path}" => @path,
      "%{file_extension}" => @compressor.ext,
    }
    values_for_s3_object_key_post = {
      "%{date_slice}" => time_slice,  # For backward compatibility
      "%{time_slice}" => time_slice,
      "%{hms_slice}" => hms_slicer,
    }.merge!(@values_for_s3_object_chunk[chunk.unique_id])
    values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled

    s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) do |matched_key|
      values_for_s3_object_key_pre.fetch(matched_key, matched_key)
    end
    s3path = extract_placeholders(s3path, chunk)
    s3path = s3path.gsub(%r(%{[^}]+}), values_for_s3_object_key_post)
  end

  tmp = Tempfile.new("s3-")
  tmp.binmode
  begin
    @compressor.compress(chunk, tmp)
    tmp.rewind
    log.debug "out_s3: write chunk #{dump_unique_id_hex(chunk.unique_id)} with metadata #{chunk.metadata} to s3://#{@s3_bucket}/#{s3path}"

    put_options = {
      body: tmp,
      content_type: @compressor.content_type,
      storage_class: @storage_class,
    }
    put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption
    put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id
    put_options[:sse_customer_algorithm] = @sse_customer_algorithm if @sse_customer_algorithm
    put_options[:sse_customer_key] = @sse_customer_key if @sse_customer_key
    put_options[:sse_customer_key_md5] = @sse_customer_key_md5 if @sse_customer_key_md5
    put_options[:acl] = @acl if @acl
    put_options[:grant_full_control] = @grant_full_control if @grant_full_control
    put_options[:grant_read] = @grant_read if @grant_read
    put_options[:grant_read_acp] = @grant_read_acp if @grant_read_acp
    put_options[:grant_write_acp] = @grant_write_acp if @grant_write_acp

    if @s3_metadata
      put_options[:metadata] = {}
      @s3_metadata.each do |k, v|
        put_options[:metadata][k] = extract_placeholders(v, chunk).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)})
      end
    end
    @bucket.object(s3path).put(put_options)

    @values_for_s3_object_chunk.delete(chunk.unique_id)

    if @warn_for_delay
      if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay
        log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}"
      end
    end
  ensure
    tmp.close(true) rescue nil
  end
end

Private Instance Methods

check_apikeys() click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 464
def check_apikeys
  @bucket.objects(prefix: @path, :max_keys => 1).first
rescue Aws::S3::Errors::NoSuchBucket
  # ignore NoSuchBucket Error because ensure_bucket checks it.
rescue => e
  raise "can't call S3 API. Please check your credentials or s3_region configuration. error = #{e.inspect}"
end
check_s3_path_safety(conf) click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 454
def check_s3_path_safety(conf)
  unless conf.has_key?('s3_object_key_format')
    log.warn "The default value of s3_object_key_format will use ${chunk_id} instead of %{index} to avoid object conflict in v2"
  end

  if (@buffer_config.flush_thread_count > 1) && ['${chunk_id}', '%{uuid_flush}'].none? { |key| @s3_object_key_format.include?(key) }
    log.warn "No ${chunk_id} or %{uuid_flush} in s3_object_key_format with multiple flush threads. Recommend to set ${chunk_id} or %{uuid_flush} to avoid data lost by object conflict"
  end
end
ensure_bucket() click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 402
def ensure_bucket
  if !@bucket.exists?
    if @auto_create_bucket
      log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}"
      @s3.create_bucket(bucket: @s3_bucket)
    else
      raise "The specified bucket does not exist: bucket = #{@s3_bucket}"
    end
  end
end
ensure_bucket_lifecycle() click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 413
def ensure_bucket_lifecycle
  unless @bucket_lifecycle_rules.empty?
    old_rules = get_bucket_lifecycle_rules
    new_rules = @bucket_lifecycle_rules.sort_by { |rule| rule.id }.map do |rule|
      { id: rule.id, expiration: { days: rule.expiration_days }, prefix: rule.prefix, status: "Enabled" }
    end

    unless old_rules == new_rules
      log.info "Configuring bucket lifecycle rules for #{@s3_bucket} on #{@s3_endpoint}"
      @bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } })
    end
  end
end
get_bucket_lifecycle_rules() click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 427
def get_bucket_lifecycle_rules
  begin
    @bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule|
      { id: rule[:id], expiration: { days: rule[:expiration][:days] }, prefix: rule[:prefix], status: rule[:status] }
    end
  rescue Aws::S3::Errors::NoSuchLifecycleConfiguration
    []
  end
end
hex_random(chunk) click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 381
def hex_random(chunk)
  unique_hex = Fluent::UniqueId.hex(chunk.unique_id)
  unique_hex.reverse! # unique_hex is like (time_sec, time_usec, rand) => reversing gives more randomness
  unique_hex[0...@hex_random_length]
end
process_s3_object_key_format() click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 437
def process_s3_object_key_format
  %W(%{uuid} %{uuid:random} %{uuid:hostname} %{uuid:timestamp}).each { |ph|
    if @s3_object_key_format.include?(ph)
      raise Fluent::ConfigError, %!#{ph} placeholder in s3_object_key_format is removed!
    end
  }

  if @s3_object_key_format.include?('%{uuid_flush}')
    @uuid_flush_enabled = true
  end

  @s3_object_key_format.gsub('%{hostname}') { |expr|
    log.warn "%{hostname} will be removed in the future. Use \"\#{Socket.gethostname}\" instead"
    Socket.gethostname
  }
end
setup_credentials() click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 472
def setup_credentials
  options = {}
  credentials_options = {}
  case
  when @aws_key_id && @aws_sec_key
    options[:access_key_id] = @aws_key_id
    options[:secret_access_key] = @aws_sec_key
  when @assume_role_credentials
    c = @assume_role_credentials
    credentials_options[:role_arn] = c.role_arn
    credentials_options[:role_session_name] = c.role_session_name
    credentials_options[:policy] = c.policy if c.policy
    credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
    credentials_options[:external_id] = c.external_id if c.external_id
    credentials_options[:sts_endpoint_url] = c.sts_endpoint_url if c.sts_endpoint_url
    credentials_options[:sts_http_proxy] = c.sts_http_proxy if c.sts_http_proxy
    if c.sts_http_proxy && c.sts_endpoint_url
      credentials_options[:client] = Aws::STS::Client.new(http_proxy: c.sts_http_proxy, endpoint: c.sts_endpoint_url)
    elsif @region && c.sts_http_proxy
      credentials_options[:client] = Aws::STS::Client.new(region: @region, http_proxy: c.sts_http_proxy)
    elsif @region && c.sts_endpoint_url
      credentials_options[:client] = Aws::STS::Client.new(region: @region, endpoint: c.sts_endpoint_url)
    elsif c.sts_http_proxy
      credentials_options[:client] = Aws::STS::Client.new(http_proxy: c.sts_http_proxy)
    elsif c.sts_endpoint_url
      credentials_options[:client] = Aws::STS::Client.new(endpoint: c.sts_endpoint_url)
    elsif c.sts_region
      credentials_options[:client] = Aws::STS::Client.new(region: c.sts_region)
    elsif @s3_region
      credentials_options[:client] = Aws::STS::Client.new(region: @s3_region)
    end
    options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options)
  when @web_identity_credentials
    c = @web_identity_credentials
    credentials_options[:role_arn] = c.role_arn
    credentials_options[:role_session_name] = c.role_session_name
    credentials_options[:web_identity_token_file] = c.web_identity_token_file
    credentials_options[:policy] = c.policy if c.policy
    credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds
    if c.sts_region
      credentials_options[:client] = Aws::STS::Client.new(:region => c.sts_region)
    elsif @s3_region
      credentials_options[:client] = Aws::STS::Client.new(:region => @s3_region)
    end
    options[:credentials] = Aws::AssumeRoleWebIdentityCredentials.new(credentials_options)
  when @instance_profile_credentials
    c = @instance_profile_credentials
    credentials_options[:retries] = c.retries if c.retries
    credentials_options[:ip_address] = c.ip_address if c.ip_address
    credentials_options[:port] = c.port if c.port
    credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout
    credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout
    if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"]
      options[:credentials] = Aws::ECSCredentials.new(credentials_options)
    else
      options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
    end
  when @shared_credentials
    c = @shared_credentials
    credentials_options[:path] = c.path if c.path
    credentials_options[:profile_name] = c.profile_name if c.profile_name
    options[:credentials] = Aws::SharedCredentials.new(credentials_options)
  when @aws_iam_retries
    log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead")
    credentials_options[:retries] = @aws_iam_retries
    if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"]
      options[:credentials] = Aws::ECSCredentials.new(credentials_options)
    else
      options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options)
    end
  else
    # Use default credentials
    # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html
  end
  options
end
timekey_to_timeformat(timekey) click to toggle source

This is stolen from Fluentd

# File lib/fluent/plugin/out_s3.rb, line 392
def timekey_to_timeformat(timekey)
  case timekey
  when nil          then ''
  when 0...60       then '%Y%m%d%H%M%S' # 60 exclusive
  when 60...3600    then '%Y%m%d%H%M'
  when 3600...86400 then '%Y%m%d%H'
  else                   '%Y%m%d'
  end
end
uuid_random() click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 387
def uuid_random
  SecureRandom.uuid
end