class LogStash::Outputs::Qingstor::Uploader

Constants

DEFAULT_THREADPOOL
TIME_BEFORE_RETRYING_SECONDS

Attributes

bucket[R]
logger[R]
upload_options[R]

Public Class Methods

new(bucket, logger, threadpool = DEFAULT_THREADPOOL) click to toggle source
# File lib/logstash/outputs/qingstor/uploader.rb, line 25
def initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL)
  @bucket = bucket
  @logger = logger
  @workers_pool = threadpool
end

Public Instance Methods

process_encrypt_options(upload_options) click to toggle source
# File lib/logstash/outputs/qingstor/uploader.rb, line 55
def process_encrypt_options(upload_options)
  res = {}

  unless upload_options[:server_side_encryption_algorithm].nil?
    base64_key = Base64.strict_encode64(upload_options[:customer_key])
    key_md5 = Digest::MD5.hexdigest(upload_options[:customer_key])
    base64_key_md5 = Base64.strict_encode64(key_md5)
    res.merge!(
      'x_qs_encryption_customer_algorithm' =>
        upload_options[:server_side_encryption_algorithm],
      'x_qs_encryption_customer_key' => base64_key,
      'x_qs_encryption_customer_key_md5' => base64_key_md5
    )
  end

  res
end
stop() click to toggle source
# File lib/logstash/outputs/qingstor/uploader.rb, line 73
def stop
  @workers_pool.shutdown
  @workers_pool.wait_for_termination(nil)
end
upload(file, options = {}) click to toggle source
# File lib/logstash/outputs/qingstor/uploader.rb, line 37
def upload(file, options = {})
  upload_options = options.fetch(:upload_options, {})
  upload_headers = process_encrypt_options(upload_options)

  if file.size > 50 * 1024 * 1024
    @logger.info('Multipart uploading file', :file => file.key)
    multipart_uploader = MultipartUploader.new(@bucket, @logger, file, upload_headers)
    multipart_uploader.upload
  else
    upload_headers['content_md5'] = Digest::MD5.file(file.path).to_s
    upload_headers['body'] = ::File.read(file.path)
    @logger.info('Uploading file', :file => file.key)
    @bucket.put_object(file.key, upload_headers)
  end

  options[:on_complete].call(file) unless options[:on_complete].nil?
end
upload_async(file, options = {}) click to toggle source
# File lib/logstash/outputs/qingstor/uploader.rb, line 31
def upload_async(file, options = {})
  @workers_pool.post do
    upload(file, options)
  end
end