class Fluent::Plugin::OSSOutput
OSSOutput
class implementation
Constants
- COMPRESSOR_REGISTRY
- DEFAULT_FORMAT_TYPE
- MAX_HEX_RANDOM_LENGTH
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_oss.rb, line 157 def initialize super @compressor = nil @uuid_flush_enabled = false end
register_compressor(name, compressor)
click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 418 def self.register_compressor(name, compressor) COMPRESSOR_REGISTRY.register(name, compressor) end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_oss.rb, line 90 def configure(conf) compat_parameters_convert(conf, :buffer, :formatter, :inject) super raise Fluent::ConfigError, 'Invalid oss endpoint' if @endpoint.nil? if @hex_random_length > MAX_HEX_RANDOM_LENGTH raise Fluent::ConfigError, 'hex_random_length parameter must be '\ "less than or equal to #{MAX_HEX_RANDOM_LENGTH}" end unless @index_format =~ /^%(0\d*)?[dxX]$/ raise Fluent::ConfigError, 'index_format parameter should follow '\ '`%[flags][width]type`. `0` is the only supported flag, '\ 'and is mandatory if width is specified. '\ '`d`, `x` and `X` are supported types' end begin @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: @buffer_config[:@type], log: log) rescue StandardError => e log.warn "'#{@store_as}' not supported. Use 'text' instead: error = #{e.message}" @compressor = TextCompressor.new end @compressor.configure(conf) @formatter = formatter_create process_key_format unless @check_object if config.has_key?('key_format') log.warn "set 'check_object false' and key_format is "\ 'specified. Check key_format is unique in each '\ 'write. If not, existing file will be overwritten.' else log.warn "set 'check_object false' and key_format is "\ 'not specified. Use '\ "'%{path}/%{time_slice}_%{hms_slice}_%{thread_id}.%{file_extension}' "\ 'for key_format' @key_format = '%{path}/%{time_slice}_%{hms_slice}_%{thread_id}.%{file_extension}' end end @configured_time_slice_format = conf['time_slice_format'] @values_for_oss_object_chunk = {} @time_slice_with_tz = Fluent::Timezone.formatter( @timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey'])) end
create_oss_client()
click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 265 def create_oss_client @oss = Aliyun::OSS::Client.new( endpoint: @endpoint, access_key_id: @access_key_id, access_key_secret: @access_key_secret, download_crc_enable: @download_crc_enable, upload_crc_enable: @upload_crc_enable, open_timeout: @open_timeout, read_timeout: @read_timeout ) end
ensure_bucket()
click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 309 def ensure_bucket unless @oss.bucket_exist?(@bucket) if @auto_create_bucket log.info "creating bucket #{@bucket} on #{@endpoint}" @oss.create_bucket(@bucket) else raise "the specified bucket does not exist: bucket = #{@bucket}" end end @bucket_handler = @oss.get_bucket(@bucket) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 172 def format(tag, time, record) r = inject_values_to_record(tag, time, record) @formatter.format(tag, time, r) end
hex_random(chunk)
click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 302 def hex_random(chunk) unique_hex = Fluent::UniqueId.hex(chunk.unique_id) # unique_hex is like (time_sec, time_usec, rand) => reversing gives more randomness unique_hex.reverse! unique_hex[0...@hex_random_length] end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 153 def multi_workers_ready? true end
process_key_format()
click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 277 def process_key_format if @key_format.include?('%{uuid_flush}') # verify uuidtools begin require 'uuidtools' rescue LoadError raise Fluent::ConfigError, 'uuidtools gem not found.'\ ' Install uuidtools gem first' end begin uuid_random rescue => e raise Fluent::ConfigError, "generating uuid doesn't work. "\ "Can't use %{uuid_flush} on this environment. #{e}" end @uuid_flush_enabled = true end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_oss.rb, line 163 def start @oss_sdk_log_dir += '/' unless @oss_sdk_log_dir.end_with?('/') Aliyun::Common::Logging.set_log_file(@oss_sdk_log_dir + Aliyun::Common::Logging::DEFAULT_LOG_FILE) create_oss_client unless @oss ensure_bucket if @check_bucket super end
timekey_to_timeformat(timekey)
click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 143 def timekey_to_timeformat(timekey) case timekey when nil then '' when 0...60 then '%Y%m%d-%H_%M_%S' # 60 exclusive when 60...3600 then '%Y%m%d-%H_%M' when 3600...86400 then '%Y%m%d-%H' else '%Y%m%d' end end
uuid_random()
click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 298 def uuid_random ::UUIDTools::UUID.random_create.to_s end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 177 def write(chunk) index = 0 metadata = chunk.metadata time_slice = if metadata.timekey.nil? ''.freeze else @time_slice_with_tz.call(metadata.timekey) end @values_for_oss_object_chunk[chunk.unique_id] ||= { '%{hex_random}' => hex_random(chunk) } if @check_object exist_key = nil begin values_for_oss_key = { '%{path}' => @path, '%{thread_id}' => Thread.current.object_id.to_s, '%{file_extension}' => @compressor.ext, '%{time_slice}' => time_slice, '%{index}' => sprintf(@index_format, index) }.merge!(@values_for_oss_object_chunk[chunk.unique_id]) values_for_oss_key['%{uuid_flush}'.freeze] = uuid_random if @uuid_flush_enabled key = @key_format.gsub(/%{[^}]+}/) do |matched_key| values_for_oss_key.fetch(matched_key, matched_key) end key = extract_placeholders(key, chunk) key = key.gsub(/%{[^}]+}/, values_for_oss_key) if (index > 0) && (key == exist_key) if @overwrite log.warn "#{key} already exists, but will overwrite" break else raise "duplicated path is generated. use %{index} in key_format: path = #{key}" end end index += 1 exist_key = key end while @bucket_handler.object_exists?(key) else hms_slice = Time.now.utc.strftime('%H%M%S') hms_slice = Time.now.strftime('%H%M%S') if @local_time values_for_oss_key = { '%{path}' => @path, '%{thread_id}' => Thread.current.object_id.to_s, '%{file_extension}' => @compressor.ext, '%{time_slice}' => time_slice, '%{hms_slice}' => hms_slice }.merge!(@values_for_oss_object_chunk[chunk.unique_id]) values_for_oss_key['%{uuid_flush}'.freeze] = uuid_random if @uuid_flush_enabled key = @key_format.gsub(/%{[^}]+}/) do |matched_key| values_for_oss_key.fetch(matched_key, matched_key) end key = extract_placeholders(key, chunk) key = key.gsub(/%{[^}]+}/, values_for_oss_key) end out_file = Tempfile.new('oss-fluent-') out_file.binmode begin @compressor.compress(chunk, out_file) out_file.rewind log.info "out_oss: write chunk #{dump_unique_id_hex(chunk.unique_id)} with metadata #{chunk.metadata} to oss://#{@bucket}/#{key}, size #{out_file.size}" start = Time.now.to_i @bucket_handler.put_object(key, file: out_file, content_type: @compressor.content_type) log.debug "out_oss: write oss://#{@bucket}/#{key} used #{Time.now.to_i - start} seconds, size #{out_file.length}" @values_for_oss_object_chunk.delete(chunk.unique_id) if @warn_for_delay if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay log.warn "out_oss: delayed events were put to oss://#{@bucket}/#{key}" end end ensure out_file.close(true) rescue nil end end