class Fluent::Plugin::S3Output
Constants
- COMPRESSOR_REGISTRY
- DEFAULT_FORMAT_TYPE
- MAX_HEX_RANDOM_LENGTH
Attributes
bucket[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_s3.rb, line 16 def initialize super @compressor = nil @uuid_flush_enabled = false end
Private Class Methods
register_compressor(name, compressor)
click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 638 def self.register_compressor(name, compressor) COMPRESSOR_REGISTRY.register(name, compressor) end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_s3.rb, line 176 def configure(conf) compat_parameters_convert(conf, :buffer, :formatter, :inject) super Aws.use_bundled_cert! if @use_bundled_cert if @s3_endpoint && (@s3_endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @s3_endpoint.include?(e) }) raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services" end begin buffer_type = @buffer_config[:@type] @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: buffer_type, log: log) rescue => e log.warn "'#{@store_as}' not supported. Use 'text' instead: error = #{e.message}" @compressor = TextCompressor.new end @compressor.configure(conf) @formatter = formatter_create if @hex_random_length > MAX_HEX_RANDOM_LENGTH raise Fluent::ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}" end unless @index_format =~ /^%(0\d*)?[dxX]$/ raise Fluent::ConfigError, "index_format parameter should follow `%[flags][width]type`. `0` is the only supported flag, and is mandatory if width is specified. `d`, `x` and `X` are supported types" end if @reduced_redundancy log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead" @storage_class = "REDUCED_REDUNDANCY" end @s3_object_key_format = process_s3_object_key_format if !@check_object if conf.has_key?('s3_object_key_format') log.warn "Set 'check_object false' and s3_object_key_format is specified. Check s3_object_key_format is unique in each write. If not, existing file will be overwritten." else log.warn "Set 'check_object false' and s3_object_key_format is not specified. Use '%{path}/%{time_slice}_%{hms_slice}.%{file_extension}' for s3_object_key_format" @s3_object_key_format = "%{path}/%{time_slice}_%{hms_slice}.%{file_extension}" end end check_s3_path_safety(conf) # For backward compatibility # TODO: Remove time_slice_format when end of support compat_parameters @configured_time_slice_format = conf['time_slice_format'] @values_for_s3_object_chunk = {} @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey'])) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 261 def format(tag, time, record) r = inject_values_to_record(tag, time, record) @formatter.format(tag, time, r) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 230 def multi_workers_ready? true end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_s3.rb, line 234 def start options = setup_credentials options[:region] = @s3_region if @s3_region options[:endpoint] = @s3_endpoint if @s3_endpoint options[:use_accelerate_endpoint] = @enable_transfer_acceleration options[:use_dualstack_endpoint] = @enable_dual_stack options[:http_proxy] = @proxy_uri if @proxy_uri options[:force_path_style] = @force_path_style options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil? options[:signature_version] = @signature_version unless @signature_version.nil? options[:ssl_verify_peer] = @ssl_verify_peer log.on_trace do options[:http_wire_trace] = true options[:logger] = log end s3_client = Aws::S3::Client.new(options) @s3 = Aws::S3::Resource.new(client: s3_client) @bucket = @s3.bucket(@s3_bucket) check_apikeys if @check_apikey_on_start ensure_bucket if @check_bucket ensure_bucket_lifecycle super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 266 def write(chunk) i = 0 metadata = chunk.metadata previous_path = nil time_slice = if metadata.timekey.nil? ''.freeze else @time_slice_with_tz.call(metadata.timekey) end if @check_object begin @values_for_s3_object_chunk[chunk.unique_id] ||= { "%{hex_random}" => hex_random(chunk), } values_for_s3_object_key_pre = { "%{path}" => @path, "%{file_extension}" => @compressor.ext, } values_for_s3_object_key_post = { "%{time_slice}" => time_slice, "%{index}" => sprintf(@index_format,i), }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) do |matched_key| values_for_s3_object_key_pre.fetch(matched_key, matched_key) end s3path = extract_placeholders(s3path, chunk) s3path = s3path.gsub(%r(%{[^}]+}), values_for_s3_object_key_post) if (i > 0) && (s3path == previous_path) if @overwrite log.warn "#{s3path} already exists, but will overwrite" break else raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}" end end i += 1 previous_path = s3path end while @bucket.object(s3path).exists? else if @localtime hms_slicer = Time.now.strftime("%H%M%S") else hms_slicer = Time.now.utc.strftime("%H%M%S") end @values_for_s3_object_chunk[chunk.unique_id] ||= { "%{hex_random}" => hex_random(chunk), } values_for_s3_object_key_pre = { "%{path}" => @path, "%{file_extension}" => @compressor.ext, } values_for_s3_object_key_post = { "%{date_slice}" => time_slice, # For backward compatibility "%{time_slice}" => time_slice, "%{hms_slice}" => hms_slicer, }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) do |matched_key| values_for_s3_object_key_pre.fetch(matched_key, matched_key) end s3path = extract_placeholders(s3path, chunk) s3path = s3path.gsub(%r(%{[^}]+}), values_for_s3_object_key_post) end tmp = Tempfile.new("s3-") tmp.binmode begin @compressor.compress(chunk, tmp) tmp.rewind log.debug "out_s3: write chunk #{dump_unique_id_hex(chunk.unique_id)} with metadata #{chunk.metadata} to s3://#{@s3_bucket}/#{s3path}" put_options = { body: tmp, content_type: @compressor.content_type, storage_class: @storage_class, } put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id put_options[:sse_customer_algorithm] = @sse_customer_algorithm if @sse_customer_algorithm put_options[:sse_customer_key] = @sse_customer_key if @sse_customer_key put_options[:sse_customer_key_md5] = @sse_customer_key_md5 if @sse_customer_key_md5 put_options[:acl] = @acl if @acl put_options[:grant_full_control] = @grant_full_control if @grant_full_control put_options[:grant_read] = @grant_read if @grant_read put_options[:grant_read_acp] = @grant_read_acp if @grant_read_acp put_options[:grant_write_acp] = @grant_write_acp if @grant_write_acp if @s3_metadata put_options[:metadata] = {} @s3_metadata.each do |k, v| put_options[:metadata][k] = extract_placeholders(v, chunk).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)}) end end @bucket.object(s3path).put(put_options) @values_for_s3_object_chunk.delete(chunk.unique_id) if @warn_for_delay if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}" end end ensure tmp.close(true) rescue nil end end
Private Instance Methods
check_apikeys()
click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 464 def check_apikeys @bucket.objects(prefix: @path, :max_keys => 1).first rescue Aws::S3::Errors::NoSuchBucket # ignore NoSuchBucket Error because ensure_bucket checks it. rescue => e raise "can't call S3 API. Please check your credentials or s3_region configuration. error = #{e.inspect}" end
check_s3_path_safety(conf)
click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 454 def check_s3_path_safety(conf) unless conf.has_key?('s3_object_key_format') log.warn "The default value of s3_object_key_format will use ${chunk_id} instead of %{index} to avoid object conflict in v2" end if (@buffer_config.flush_thread_count > 1) && ['${chunk_id}', '%{uuid_flush}'].none? { |key| @s3_object_key_format.include?(key) } log.warn "No ${chunk_id} or %{uuid_flush} in s3_object_key_format with multiple flush threads. Recommend to set ${chunk_id} or %{uuid_flush} to avoid data lost by object conflict" end end
ensure_bucket()
click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 402 def ensure_bucket if !@bucket.exists? if @auto_create_bucket log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}" @s3.create_bucket(bucket: @s3_bucket) else raise "The specified bucket does not exist: bucket = #{@s3_bucket}" end end end
ensure_bucket_lifecycle()
click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 413 def ensure_bucket_lifecycle unless @bucket_lifecycle_rules.empty? old_rules = get_bucket_lifecycle_rules new_rules = @bucket_lifecycle_rules.sort_by { |rule| rule.id }.map do |rule| { id: rule.id, expiration: { days: rule.expiration_days }, prefix: rule.prefix, status: "Enabled" } end unless old_rules == new_rules log.info "Configuring bucket lifecycle rules for #{@s3_bucket} on #{@s3_endpoint}" @bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } }) end end end
get_bucket_lifecycle_rules()
click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 427 def get_bucket_lifecycle_rules begin @bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule| { id: rule[:id], expiration: { days: rule[:expiration][:days] }, prefix: rule[:prefix], status: rule[:status] } end rescue Aws::S3::Errors::NoSuchLifecycleConfiguration [] end end
hex_random(chunk)
click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 381 def hex_random(chunk) unique_hex = Fluent::UniqueId.hex(chunk.unique_id) unique_hex.reverse! # unique_hex is like (time_sec, time_usec, rand) => reversing gives more randomness unique_hex[0...@hex_random_length] end
process_s3_object_key_format()
click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 437 def process_s3_object_key_format %W(%{uuid} %{uuid:random} %{uuid:hostname} %{uuid:timestamp}).each { |ph| if @s3_object_key_format.include?(ph) raise Fluent::ConfigError, %!#{ph} placeholder in s3_object_key_format is removed! end } if @s3_object_key_format.include?('%{uuid_flush}') @uuid_flush_enabled = true end @s3_object_key_format.gsub('%{hostname}') { |expr| log.warn "%{hostname} will be removed in the future. Use \"\#{Socket.gethostname}\" instead" Socket.gethostname } end
setup_credentials()
click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 472 def setup_credentials options = {} credentials_options = {} case when @aws_key_id && @aws_sec_key options[:access_key_id] = @aws_key_id options[:secret_access_key] = @aws_sec_key when @assume_role_credentials c = @assume_role_credentials credentials_options[:role_arn] = c.role_arn credentials_options[:role_session_name] = c.role_session_name credentials_options[:policy] = c.policy if c.policy credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds credentials_options[:external_id] = c.external_id if c.external_id credentials_options[:sts_endpoint_url] = c.sts_endpoint_url if c.sts_endpoint_url credentials_options[:sts_http_proxy] = c.sts_http_proxy if c.sts_http_proxy if c.sts_http_proxy && c.sts_endpoint_url credentials_options[:client] = Aws::STS::Client.new(http_proxy: c.sts_http_proxy, endpoint: c.sts_endpoint_url) elsif @region && c.sts_http_proxy credentials_options[:client] = Aws::STS::Client.new(region: @region, http_proxy: c.sts_http_proxy) elsif @region && c.sts_endpoint_url credentials_options[:client] = Aws::STS::Client.new(region: @region, endpoint: c.sts_endpoint_url) elsif c.sts_http_proxy credentials_options[:client] = Aws::STS::Client.new(http_proxy: c.sts_http_proxy) elsif c.sts_endpoint_url credentials_options[:client] = Aws::STS::Client.new(endpoint: c.sts_endpoint_url) elsif c.sts_region credentials_options[:client] = Aws::STS::Client.new(region: c.sts_region) elsif @s3_region credentials_options[:client] = Aws::STS::Client.new(region: @s3_region) end options[:credentials] = Aws::AssumeRoleCredentials.new(credentials_options) when @web_identity_credentials c = @web_identity_credentials credentials_options[:role_arn] = c.role_arn credentials_options[:role_session_name] = c.role_session_name credentials_options[:web_identity_token_file] = c.web_identity_token_file credentials_options[:policy] = c.policy if c.policy credentials_options[:duration_seconds] = c.duration_seconds if c.duration_seconds if c.sts_region credentials_options[:client] = Aws::STS::Client.new(:region => c.sts_region) elsif @s3_region credentials_options[:client] = Aws::STS::Client.new(:region => @s3_region) end options[:credentials] = Aws::AssumeRoleWebIdentityCredentials.new(credentials_options) when @instance_profile_credentials c = @instance_profile_credentials credentials_options[:retries] = c.retries if c.retries credentials_options[:ip_address] = c.ip_address if c.ip_address credentials_options[:port] = c.port if c.port credentials_options[:http_open_timeout] = c.http_open_timeout if c.http_open_timeout credentials_options[:http_read_timeout] = c.http_read_timeout if c.http_read_timeout if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"] options[:credentials] = Aws::ECSCredentials.new(credentials_options) else options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options) end when @shared_credentials c = @shared_credentials credentials_options[:path] = c.path if c.path credentials_options[:profile_name] = c.profile_name if c.profile_name options[:credentials] = Aws::SharedCredentials.new(credentials_options) when @aws_iam_retries log.warn("'aws_iam_retries' parameter is deprecated. Use 'instance_profile_credentials' instead") credentials_options[:retries] = @aws_iam_retries if ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"] options[:credentials] = Aws::ECSCredentials.new(credentials_options) else options[:credentials] = Aws::InstanceProfileCredentials.new(credentials_options) end else # Use default credentials # See http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Client.html end options end
timekey_to_timeformat(timekey)
click to toggle source
This is stolen from Fluentd
# File lib/fluent/plugin/out_s3.rb, line 392 def timekey_to_timeformat(timekey) case timekey when nil then '' when 0...60 then '%Y%m%d%H%M%S' # 60 exclusive when 60...3600 then '%Y%m%d%H%M' when 3600...86400 then '%Y%m%d%H' else '%Y%m%d' end end
uuid_random()
click to toggle source
# File lib/fluent/plugin/out_s3.rb, line 387 def uuid_random SecureRandom.uuid end