class Fluent::OSSOutput
Public Instance Methods
compress(chunk, tmp)
click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 29 def compress(chunk, tmp) res = system "gzip -c #{chunk.path} > #{tmp.path}" unless res log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}" begin tmp.truncate(0) gw = Zlib::GzipWriter.new(tmp) chunk.write_to(gw) gw.close ensure gw.close rescue nil end end end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_oss.rb, line 25 def configure(conf) super end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 71 def format(tag, time, record) {tag: tag, timestamp: time, log: record}.to_json + "\n" end
process_object_key_format(chunk, key_format)
click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 44 def process_object_key_format(chunk, key_format) key_map = { host: Socket.gethostname, time_slice: chunk.key, uuid: SecureRandom.hex(4), file_ext: 'gz' } result = key_format key_map.each do |k, v| result = result.gsub("%{#{k.to_s}}", v) end result end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_oss.rb, line 58 def start super Aliyun::Common::Logging.set_log_file('/dev/null') @client = Aliyun::OSS::Client.new( :endpoint => @oss_endpoint, :access_key_id => @oss_key_id, :access_key_secret => @oss_key_secret) raise "Specific bucket not exists: #{@oss_bucket}" unless @client.bucket_exists? @oss_bucket @bucket = @client.get_bucket(@oss_bucket) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_oss.rb, line 75 def write(chunk) begin f = Tempfile.new('oss-') compress(chunk, f) path = process_object_key_format(chunk, @oss_object_key_format) raise "Upload #{f.path} failed" unless @bucket.resumable_upload(path, f.path) ensure f.close(true) end end