class Fluent::Plugin::GCSOutput
Constants
- DEFAULT_FORMAT_TYPE
- MAX_HEX_RANDOM_LENGTH
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcs.rb, line 15 def initialize super require "google/cloud/storage" end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcs.rb, line 72 def configure(conf) compat_parameters_convert(conf, :buffer, :formatter, :inject) super if @hex_random_length > MAX_HEX_RANDOM_LENGTH raise Fluent::ConfigError, "hex_random_length parameter should be set to #{MAX_HEX_RANDOM_LENGTH} characters or less." end # The customer-supplied, AES-256 encryption key that will be used to encrypt the file. @encryption_opts = { encryption_key: @encryption_key, } if @object_metadata @object_metadata_hash = @object_metadata.map {|m| [m.key, m.value] }.to_h end @formatter = formatter_create @object_creator = Fluent::GCS.discovered_object_creator(@store_as, transcoding: @transcoding) # For backward compatibility # TODO: Remove time_slice_format when end of support compat_parameters @configured_time_slice_format = conf['time_slice_format'] @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey'])) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 111 def format(tag, time, record) r = inject_values_to_record(tag, time, record) @formatter.format(tag, time, r) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 116 def multi_workers_ready? true end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcs.rb, line 98 def start @gcs = Google::Cloud::Storage.new( project: @project, keyfile: @keyfile, retries: @client_retries, timeout: @client_timeout ) @gcs_bucket = @gcs.bucket(@bucket) ensure_bucket super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 120 def write(chunk) path = generate_path(chunk) @object_creator.create(chunk) do |obj| opts = { metadata: @object_metadata_hash, acl: @acl, storage_class: @storage_class, content_type: @object_creator.content_type, content_encoding: @object_creator.content_encoding, } opts.merge!(@encryption_opts) log.debug { "out_gcs: upload chunk:#{chunk.key} to gcs://#{@bucket}/#{path} options: #{opts}" } @gcs_bucket.upload_file(obj.path, path, opts) end end
Private Instance Methods
check_object_exists(path)
click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 154 def check_object_exists(path) if !@blind_write return @gcs_bucket.find_file(path, @encryption_opts) else return false end end
ensure_bucket()
click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 140 def ensure_bucket return unless @gcs_bucket.nil? if !@auto_create_bucket raise "bucket `#{@bucket}` does not exist" end log.info "creating bucket `#{@bucket}`" @gcs_bucket = @gcs.create_bucket(@bucket) end
generate_path(chunk, i = 0, prev = nil)
click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 162 def generate_path(chunk, i = 0, prev = nil) metadata = chunk.metadata time_slice = if metadata.timekey.nil? ''.freeze else @time_slice_with_tz.call(metadata.timekey) end tags = { "%{file_extension}" => @object_creator.file_extension, "%{hex_random}" => hex_random(chunk), "%{hostname}" => Socket.gethostname, "%{index}" => i, "%{path}" => @path, "%{time_slice}" => time_slice, "%{uuid_flush}" => SecureRandom.uuid, } path = @object_key_format.gsub(Regexp.union(tags.keys), tags) path = extract_placeholders(path, chunk) return path unless check_object_exists(path) if path == prev if @overwrite log.warn "object `#{path}` already exists but overwrites it" return path end raise "object `#{path}` already exists" end generate_path(chunk, i + 1, path) end
hex_random(chunk)
click to toggle source
# File lib/fluent/plugin/out_gcs.rb, line 150 def hex_random(chunk) Digest::MD5.hexdigest(chunk.unique_id)[0...@hex_random_length] end
timekey_to_timeformat(timekey)
click to toggle source
This is stolen from Fluentd
# File lib/fluent/plugin/out_gcs.rb, line 193 def timekey_to_timeformat(timekey) case timekey when nil then '' when 0...60 then '%Y%m%d%H%M%S' # 60 exclusive when 60...3600 then '%Y%m%d%H%M' when 3600...86400 then '%Y%m%d%H' else '%Y%m%d' end end