class Fluent::Plugin::AliyunossOutput

Constants

DEFAULT_FORMAT_TYPE
DEFAULT_TIMEKEY

Public Instance Methods

compress(chunk, tmp) click to toggle source
# File lib/fluent/plugin/out_aliyunoss.rb, line 64
def compress(chunk, tmp)
        if @store_as == "orc"
                # We just need a tmp file path, orc-tools convert won't work if file exists
                output_path = tmp.path
                tmp.delete
                # Create a symlink with .json suffix, to fool orc-tools
                chunk_path = File::realpath(chunk.path)
                fake_path = "#{chunk_path}.json"
                if File::exists?(fake_path)
                        File::unlink(fake_path)
                end
                File::symlink(chunk_path, fake_path)

                command = "java -Dlog4j.configuration=file:/log4j.properties -jar /orc-tools.jar convert -o #{output_path} #{fake_path}"
                res = system command
                unless res
                        raise "failed to execute java -jar /orc-tools.jar command. status = #{$?}"
                end
                File::unlink(fake_path)
        else
                res = system "gzip -c #{chunk.path} > #{tmp.path}"
                unless res
                        log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}"
                        begin
                                tmp.truncate(0)
                                gw = Zlib::GzipWriter.new(tmp)
                                chunk.write_to(gw)
                                gw.close
                        ensure
                                gw.close rescue nil
                        end
                end
        end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_aliyunoss.rb, line 57
def configure(conf)
        super
        compat_parameters_convert(conf, :formatter, :buffer, :inject, default_chunk_key: "time")

        @json_formatter = formatter_create(usage: 'formatter_in_example_json', type: 'json')
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_aliyunoss.rb, line 131
def format(tag, time, record)
        r = inject_values_to_record(tag, time, record)
        @json_formatter.format(tag, time, r)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_aliyunoss.rb, line 115
def multi_workers_ready?
        true
end
process_object_key_format(chunk, key_format) click to toggle source
# File lib/fluent/plugin/out_aliyunoss.rb, line 99
def process_object_key_format(chunk, key_format)
        key_map = {
                host: Socket.gethostname,
                uuid: SecureRandom.hex(4),
        }
        result = key_format
        key_map.each do |k, v|
                result = result.gsub("%{#{k.to_s}}", v)
        end
        # support replace tag_parts
        chunk.metadata.tag.split('.').each_with_index do |t, idx|
                result = result.gsub("${tag_parts[#{idx}]}", t)
        end
        extract_placeholders(result, chunk.metadata)
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_aliyunoss.rb, line 119
def start
        super
        @client = Aliyun::OSS::Client.new(
                :endpoint => @oss_endpoint,
                :access_key_id => @oss_key_id,
                :access_key_secret => @oss_key_secret)

        raise "Specific bucket not exists: #{@oss_bucket}" unless @client.bucket_exists? @oss_bucket

        @bucket = @client.get_bucket(@oss_bucket)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_aliyunoss.rb, line 136
def write(chunk)
        begin
                f = Tempfile.new('oss-')
                output_path = f.path
                compress(chunk, f)
                path = process_object_key_format(chunk, "#{@oss_path}.#{@store_as}")
                raise "Upload #{output_path} failed" unless @bucket.resumable_upload(path, output_path)
        ensure
                f.close(true)
        end
end