class Fluent::Plugin::OSSOutput

OSSOutput class implementation

Constants

COMPRESSOR_REGISTRY
DEFAULT_FORMAT_TYPE
MAX_HEX_RANDOM_LENGTH

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_oss.rb, line 157
def initialize
  super
  @compressor = nil
  @uuid_flush_enabled = false
end
register_compressor(name, compressor) click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 418
def self.register_compressor(name, compressor)
  COMPRESSOR_REGISTRY.register(name, compressor)
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_oss.rb, line 90
def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter, :inject)

  super

  raise Fluent::ConfigError, 'Invalid oss endpoint' if @endpoint.nil?

  if @hex_random_length > MAX_HEX_RANDOM_LENGTH
    raise Fluent::ConfigError, 'hex_random_length parameter must be '\
          "less than or equal to #{MAX_HEX_RANDOM_LENGTH}"
  end

  unless @index_format =~ /^%(0\d*)?[dxX]$/
    raise Fluent::ConfigError, 'index_format parameter should follow '\
          '`%[flags][width]type`. `0` is the only supported flag, '\
          'and is mandatory if width is specified. '\
          '`d`, `x` and `X` are supported types'
  end

  begin
    @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: @buffer_config[:@type], log: log)
  rescue StandardError => e
    log.warn "'#{@store_as}' not supported. Use 'text' instead: error = #{e.message}"
    @compressor = TextCompressor.new
  end

  @compressor.configure(conf)

  @formatter = formatter_create

  process_key_format

  unless @check_object
    if config.has_key?('key_format')
      log.warn "set 'check_object false' and key_format is "\
              'specified. Check key_format is unique in each '\
              'write. If not, existing file will be overwritten.'
    else
      log.warn "set 'check_object false' and key_format is "\
              'not specified. Use '\
              "'%{path}/%{time_slice}_%{hms_slice}_%{thread_id}.%{file_extension}' "\
              'for key_format'
      @key_format = '%{path}/%{time_slice}_%{hms_slice}_%{thread_id}.%{file_extension}'
    end
  end

  @configured_time_slice_format = conf['time_slice_format']
  @values_for_oss_object_chunk = {}
  @time_slice_with_tz = Fluent::Timezone.formatter(
    @timekey_zone,
    @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']))
end
create_oss_client() click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 265
def create_oss_client
  @oss = Aliyun::OSS::Client.new(
    endpoint: @endpoint,
    access_key_id: @access_key_id,
    access_key_secret: @access_key_secret,
    download_crc_enable: @download_crc_enable,
    upload_crc_enable: @upload_crc_enable,
    open_timeout: @open_timeout,
    read_timeout: @read_timeout
  )
end
ensure_bucket() click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 309
def ensure_bucket
  unless @oss.bucket_exist?(@bucket)
    if @auto_create_bucket
      log.info "creating bucket #{@bucket} on #{@endpoint}"
      @oss.create_bucket(@bucket)
    else
      raise "the specified bucket does not exist: bucket = #{@bucket}"
    end
  end

  @bucket_handler = @oss.get_bucket(@bucket)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 172
def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end
hex_random(chunk) click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 302
def hex_random(chunk)
  unique_hex = Fluent::UniqueId.hex(chunk.unique_id)
  # unique_hex is like (time_sec, time_usec, rand) => reversing gives more randomness
  unique_hex.reverse!
  unique_hex[0...@hex_random_length]
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 153
def multi_workers_ready?
  true
end
process_key_format() click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 277
def process_key_format
  if @key_format.include?('%{uuid_flush}')
    # verify uuidtools
    begin
      require 'uuidtools'
    rescue LoadError
      raise Fluent::ConfigError, 'uuidtools gem not found.'\
            ' Install uuidtools gem first'
    end

    begin
      uuid_random
    rescue => e
      raise Fluent::ConfigError, "generating uuid doesn't work. "\
            "Can't use %{uuid_flush} on this environment. #{e}"
    end

    @uuid_flush_enabled = true
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_oss.rb, line 163
def start
  @oss_sdk_log_dir += '/' unless @oss_sdk_log_dir.end_with?('/')
  Aliyun::Common::Logging.set_log_file(@oss_sdk_log_dir + Aliyun::Common::Logging::DEFAULT_LOG_FILE)
  create_oss_client unless @oss

  ensure_bucket if @check_bucket
  super
end
timekey_to_timeformat(timekey) click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 143
def timekey_to_timeformat(timekey)
  case timekey
  when nil          then ''
  when 0...60       then '%Y%m%d-%H_%M_%S' # 60 exclusive
  when 60...3600    then '%Y%m%d-%H_%M'
  when 3600...86400 then '%Y%m%d-%H'
  else                   '%Y%m%d'
  end
end
uuid_random() click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 298
def uuid_random
  ::UUIDTools::UUID.random_create.to_s
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 177
def write(chunk)
  index = 0
  metadata = chunk.metadata
  time_slice = if metadata.timekey.nil?
                 ''.freeze
               else
                 @time_slice_with_tz.call(metadata.timekey)
               end

  @values_for_oss_object_chunk[chunk.unique_id] ||= {
    '%{hex_random}' => hex_random(chunk)
  }

  if @check_object
    exist_key = nil
    begin
      values_for_oss_key = {
        '%{path}' => @path,
        '%{thread_id}' => Thread.current.object_id.to_s,
        '%{file_extension}' => @compressor.ext,
        '%{time_slice}' => time_slice,
        '%{index}' => sprintf(@index_format, index)
      }.merge!(@values_for_oss_object_chunk[chunk.unique_id])

      values_for_oss_key['%{uuid_flush}'.freeze] = uuid_random if @uuid_flush_enabled

      key = @key_format.gsub(/%{[^}]+}/) do |matched_key|
        values_for_oss_key.fetch(matched_key, matched_key)
      end
      key = extract_placeholders(key, chunk)
      key = key.gsub(/%{[^}]+}/, values_for_oss_key)

      if (index > 0) && (key == exist_key)
        if @overwrite
          log.warn "#{key} already exists, but will overwrite"
          break
        else
          raise "duplicated path is generated. use %{index} in key_format: path = #{key}"
        end
      end

      index += 1
      exist_key = key
    end while @bucket_handler.object_exists?(key)
  else
    hms_slice = Time.now.utc.strftime('%H%M%S')
    hms_slice = Time.now.strftime('%H%M%S') if @local_time

    values_for_oss_key = {
      '%{path}' => @path,
      '%{thread_id}' => Thread.current.object_id.to_s,
      '%{file_extension}' => @compressor.ext,
      '%{time_slice}' => time_slice,
      '%{hms_slice}' => hms_slice
    }.merge!(@values_for_oss_object_chunk[chunk.unique_id])

    values_for_oss_key['%{uuid_flush}'.freeze] = uuid_random if @uuid_flush_enabled

    key = @key_format.gsub(/%{[^}]+}/) do |matched_key|
      values_for_oss_key.fetch(matched_key, matched_key)
    end
    key = extract_placeholders(key, chunk)
    key = key.gsub(/%{[^}]+}/, values_for_oss_key)
  end

  out_file = Tempfile.new('oss-fluent-')
  out_file.binmode
  begin
    @compressor.compress(chunk, out_file)
    out_file.rewind
    log.info "out_oss: write chunk #{dump_unique_id_hex(chunk.unique_id)} with metadata #{chunk.metadata} to oss://#{@bucket}/#{key}, size #{out_file.size}"

    start = Time.now.to_i
    @bucket_handler.put_object(key, file: out_file, content_type: @compressor.content_type)

    log.debug "out_oss: write oss://#{@bucket}/#{key} used #{Time.now.to_i - start} seconds, size #{out_file.length}"
    @values_for_oss_object_chunk.delete(chunk.unique_id)

    if @warn_for_delay
      if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay
        log.warn "out_oss: delayed events were put to oss://#{@bucket}/#{key}"
      end
    end
  ensure
    out_file.close(true) rescue nil
  end
end