class Fluent::GCSOutput
Constants
- MAX_HEX_RANDOM_LENGTH
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcs.rb, line 12 def initialize super require "google/cloud/storage" end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcs.rb, line 56 def configure(conf) super if @hex_random_length > MAX_HEX_RANDOM_LENGTH raise Fluent::ConfigError, "hex_random_length parameter should be set to #{MAX_HEX_RANDOM_LENGTH} characters or less." end # The customer-supplied, AES-256 encryption key that will be used to encrypt the file. @encryption_opts = { encryption_key: @encryption_key, } if @object_metadata @object_metadata_hash = @object_metadata.map {|m| [m.key, m.value] }.to_h end @formatter = Fluent::Plugin.new_formatter(@format) @formatter.configure(conf) @object_creator = Fluent::GCS.discovered_object_creator(@store_as, transcoding: @transcoding) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 91 def format(tag, time, record) @formatter.format(tag, time, record) end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcs.rb, line 78 def start @gcs = Google::Cloud::Storage.new( project: @project, keyfile: @keyfile, retries: @client_retries, timeout: @client_timeout ) @gcs_bucket = @gcs.bucket(@bucket) ensure_bucket super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 95 def write(chunk) path = generate_path(chunk) @object_creator.create(chunk) do |obj| opts = { metadata: @object_metadata_hash, acl: @acl, storage_class: @storage_class, content_type: @object_creator.content_type, content_encoding: @object_creator.content_encoding, } opts.merge!(@encryption_opts) log.debug { "out_gcs: upload chunk:#{chunk.key} to gcs://#{@bucket}/#{path} options: #{opts}" } @gcs_bucket.upload_file(obj.path, path, opts) end end
Private Instance Methods
ensure_bucket()
click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 115 def ensure_bucket return unless @gcs_bucket.nil? if !@auto_create_bucket raise "bucket `#{@bucket}` does not exist" end log.info "creating bucket `#{@bucket}`" @gcs_bucket = @gcs.create_bucket(@bucket) end
format_path(chunk)
click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 129 def format_path(chunk) now = Time.strptime(chunk.key, @time_slice_format) (@localtime ? now : now.utc).strftime(@path) end
generate_path(chunk, i = 0, prev = nil)
click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 134 def generate_path(chunk, i = 0, prev = nil) tags = { "%{file_extension}" => @object_creator.file_extension, "%{hex_random}" => hex_random(chunk), "%{hostname}" => Socket.gethostname, "%{index}" => i, "%{path}" => format_path(chunk), "%{time_slice}" => chunk.key, "%{uuid_flush}" => SecureRandom.uuid, } path = @object_key_format.gsub(Regexp.union(tags.keys), tags) return path unless @gcs_bucket.find_file(path, @encryption_opts) if path == prev if @overwrite log.warn "object `#{path}` already exists but overwrites it" return path end raise "object `#{path}` already exists" end generate_path(chunk, i + 1, path) end
hex_random(chunk)
click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 125 def hex_random(chunk) Digest::MD5.hexdigest(chunk.unique_id)[0...@hex_random_length] end