class Fluent::Plugin::AliyunossOutput
Constants
- DEFAULT_FORMAT_TYPE
- DEFAULT_TIMEKEY
Public Instance Methods
compress(chunk, tmp)
click to toggle source
# File lib/fluent/plugin/out_aliyunoss.rb, line 64 def compress(chunk, tmp) if @store_as == "orc" # We just need a tmp file path, orc-tools convert won't work if file exists output_path = tmp.path tmp.delete # Create a symlink with .json suffix, to fool orc-tools chunk_path = File::realpath(chunk.path) fake_path = "#{chunk_path}.json" if File::exists?(fake_path) File::unlink(fake_path) end File::symlink(chunk_path, fake_path) command = "java -Dlog4j.configuration=file:/log4j.properties -jar /orc-tools.jar convert -o #{output_path} #{fake_path}" res = system command unless res raise "failed to execute java -jar /orc-tools.jar command. status = #{$?}" end File::unlink(fake_path) else res = system "gzip -c #{chunk.path} > #{tmp.path}" unless res log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}" begin tmp.truncate(0) gw = Zlib::GzipWriter.new(tmp) chunk.write_to(gw) gw.close ensure gw.close rescue nil end end end end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_aliyunoss.rb, line 57 def configure(conf) super compat_parameters_convert(conf, :formatter, :buffer, :inject, default_chunk_key: "time") @json_formatter = formatter_create(usage: 'formatter_in_example_json', type: 'json') end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_aliyunoss.rb, line 131 def format(tag, time, record) r = inject_values_to_record(tag, time, record) @json_formatter.format(tag, time, r) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_aliyunoss.rb, line 115 def multi_workers_ready? true end
process_object_key_format(chunk, key_format)
click to toggle source
# File lib/fluent/plugin/out_aliyunoss.rb, line 99 def process_object_key_format(chunk, key_format) key_map = { host: Socket.gethostname, uuid: SecureRandom.hex(4), } result = key_format key_map.each do |k, v| result = result.gsub("%{#{k.to_s}}", v) end # support replace tag_parts chunk.metadata.tag.split('.').each_with_index do |t, idx| result = result.gsub("${tag_parts[#{idx}]}", t) end extract_placeholders(result, chunk.metadata) end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_aliyunoss.rb, line 119 def start super @client = Aliyun::OSS::Client.new( :endpoint => @oss_endpoint, :access_key_id => @oss_key_id, :access_key_secret => @oss_key_secret) raise "Specific bucket not exists: #{@oss_bucket}" unless @client.bucket_exists? @oss_bucket @bucket = @client.get_bucket(@oss_bucket) end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_aliyunoss.rb, line 136 def write(chunk) begin f = Tempfile.new('oss-') output_path = f.path compress(chunk, f) path = process_object_key_format(chunk, "#{@oss_path}.#{@store_as}") raise "Upload #{output_path} failed" unless @bucket.resumable_upload(path, output_path) ensure f.close(true) end end