class Fluent::OSSOutput

Public Instance Methods

compress(chunk, tmp) click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 29
def compress(chunk, tmp)
  res = system "gzip -c #{chunk.path} > #{tmp.path}"
  unless res
    log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}"
    begin
      tmp.truncate(0)
      gw = Zlib::GzipWriter.new(tmp)
      chunk.write_to(gw)
      gw.close
    ensure
      gw.close rescue nil
    end
  end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_oss.rb, line 25
def configure(conf)
  super
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 71
def format(tag, time, record)
  {tag: tag, timestamp: time, log: record}.to_json + "\n"
end
process_object_key_format(chunk, key_format) click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 44
def process_object_key_format(chunk, key_format)
  key_map = {
    host: Socket.gethostname,
    time_slice: chunk.key,
    uuid: SecureRandom.hex(4),
    file_ext: 'gz'
  }
  result = key_format
  key_map.each do |k, v|
    result = result.gsub("%{#{k.to_s}}", v)
  end
  result
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_oss.rb, line 58
def start
  super
  Aliyun::Common::Logging.set_log_file('/dev/null')
  @client = Aliyun::OSS::Client.new(
    :endpoint => @oss_endpoint,
    :access_key_id => @oss_key_id,
    :access_key_secret => @oss_key_secret)

  raise "Specific bucket not exists: #{@oss_bucket}" unless @client.bucket_exists? @oss_bucket

  @bucket = @client.get_bucket(@oss_bucket)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 75
def write(chunk)
  begin
    f = Tempfile.new('oss-')
    compress(chunk, f)
    path = process_object_key_format(chunk, @oss_object_key_format)
    raise "Upload #{f.path} failed" unless @bucket.resumable_upload(path, f.path)
  ensure
    f.close(true)
  end
end