class Fluent::Plugin::AzureStorageGen2Output

Constants

ABFS_API_VERSION
ACCESS_TOKEN_API_VERSION
AZURE_BLOCK_SIZE_LIMIT
COMPRESSOR_REGISTRY
DEFAULT_FORMAT_TYPE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 18
def initialize
    super
    @compressor = nil
end

Private Class Methods

register_compressor(name, compressor) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 748
def self.register_compressor(name, compressor)
    COMPRESSOR_REGISTRY.register(name, compressor)
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 69
def configure(conf)
    compat_parameters_convert(conf, :buffer, :formatter, :inject)
    super

    if @store_as.nil? || @store_as == "none"
        log.info "azurestorage_gen2: Compression is disabled (store_as: #{@store_as})"
    else
        begin
            @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(:buffer_type => @buffer_type, :log => log)
        rescue => e
            log.warn "#{@store_as} not found. Use 'text' instead"
            @compressor = TextCompressor.new
        end
        @compressor.configure(conf)
    end

    @formatter = formatter_create

    if @azure_container.nil?
      raise Fluent::ConfigError, "azure_container is needed"
    end

    @azure_storage_path = ''
    @last_azure_storage_path = ''
    @current_index = 0

    if @store_as.nil? || @store_as == "none"
        @final_file_extension = @file_extension
    else
        @final_file_extension = @compressor.ext
    end
    @values_for_object_chunk = {}
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 131
def format(tag, time, record)
    r = inject_values_to_record(tag, time, record)
    @formatter.format(tag, time, r)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 103
def multi_workers_ready?
    true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 107
def start
    setup_access_token
    if !@skip_container_check
        if @failsafe_container_check
            begin
                if @write_only && @auto_create_container
                    create_container
                else
                    ensure_container
                end
            rescue Exception => e
                log.warn("#{e.message}, container list/create failsafe is enabled. Continue without those operations.")
            end
        else
            if @write_only && @auto_create_container
                create_container
            else
                ensure_container
            end
        end
    end
    super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 136
def write(chunk)
    if @store_as.nil? || @store_as == "none"
        generate_log_name(chunk, @current_index)
        if @last_azure_storage_path != @azure_storage_path
            @current_index = 0
            generate_log_name(chunk, @current_index)
        end
        raw_data = chunk.read
        unless raw_data.empty?
            log.debug "azurestorage_gen2: processing raw data", chunk_id: dump_unique_id_hex(chunk.unique_id)
            upload_blob(raw_data, chunk)
        end
        chunk.close rescue nil
        @last_azure_storage_path = @azure_storage_path
    else
        tmp = Tempfile.new("azure-")
        tmp.binmode
        begin
            @compressor.compress(chunk, tmp)
            tmp.rewind
            generate_log_name(chunk, @current_index)
            if @last_azure_storage_path != @azure_storage_path
                @current_index = 0
                generate_log_name(chunk, @current_index)
            end
            log.debug "azurestorage_gen2: Start uploading temp file: #{tmp.path}"
            content = File.open(tmp.path, 'rb') { |file| file.read }
            upload_blob(content, chunk)
            @last_azure_storage_path = @azure_storage_path
        ensure
            tmp.close(true) rescue nil
        end
        @values_for_object_chunk.delete(chunk.unique_id)
    end

end

Private Instance Methods

acquire_access_token() click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 261
def acquire_access_token
    if !@azure_instance_msi.nil?
        acquire_access_token_msi
    elsif !@azure_oauth_app_id.nil? and !@azure_oauth_secret.nil? and !@azure_oauth_tenant_id.nil?
        acquire_access_token_oauth_app
    elsif @azure_oauth_use_azure_cli
        acquire_access_token_by_az
    else
        raise Fluent::UnrecoverableError, "Using MSI or 'az cli tool' or simple OAuth 2.0 based authentication parameters (azure_oauth_tenant_id, azure_oauth_app_id, azure_oauth_secret) are required."
    end
end
acquire_access_token_by_az() click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 330
def acquire_access_token_by_az
    access_token=`az account get-access-token --resource #{@url_storage_resource} --query accessToken -o tsv`
    log.debug "azurestorage_gen2: Token response: #{access_token}"
    @azure_access_token = access_token.chomp
end
acquire_access_token_msi() click to toggle source

Referenced from azure doc. docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/tutorial-linux-vm-access-storage#get-an-access-token-and-use-it-to-call-azure-storage

# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 276
def acquire_access_token_msi
    params = { :"api-version" => ACCESS_TOKEN_API_VERSION, :resource => "#{@url_storage_resource}" }
    unless @azure_instance_msi.nil?
        params[:msi_res_id] = @azure_instance_msi
    end
    unless @azure_client_id.nil?
        params[:client_id] = @azure_client_id
    end
    unless @azure_object_id.nil?
        params[:object_id] = @azure_object_id
    end
    req_opts = {
        :params => params,
        :headers => { Metadata: "true" }
    }
    add_proxy_options(req_opts)
    request = Typhoeus::Request.new("http://169.254.169.254/metadata/identity/oauth2/token", req_opts)
    request.on_complete do |response|
        if response.success?
          data = JSON.parse(response.body)
          log.debug "azurestorage_gen2: Token response: #{data}"
          @azure_access_token = data["access_token"].chomp
        else
            raise Fluent::UnrecoverableError, "Failed to acquire access token. #{response.code}: #{response.body}"
        end
    end
    request.run
end
acquire_access_token_oauth_app() click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 306
def acquire_access_token_oauth_app
    params = { :"api-version" => ACCESS_TOKEN_API_VERSION, :resource => "#{@url_storage_resource}"}
    headers = {:"Content-Type" => "application/x-www-form-urlencoded"}
    content = "grant_type=client_credentials&client_id=#{@azure_oauth_app_id}&client_secret=#{@azure_oauth_secret}&resource=#{@url_storage_resource}"
    req_opts = {
        :params => params,
        :body => content, 
        :headers => headers
    }
    add_proxy_options(req_opts)
    request = Typhoeus::Request.new("https://login.microsoftonline.com/#{@azure_oauth_tenant_id}/oauth2/token", req_opts)
    request.on_complete do |response|
        if response.success?
          data = JSON.parse(response.body)
          log.debug "azurestorage_gen2: Token response: #{data}"
          @azure_access_token = data["access_token"].chomp
        else
            raise Fluent::UnrecoverableError, "Failed to acquire access token. #{response.code}: #{response.body}"
        end
    end
    request.run
end
add_proxy_options(req_opts = {}) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 566
def add_proxy_options(req_opts = {})
    unless @proxy_url.nil?
        req_opts[:proxy] = @proxy_url
        unless @proxy_username.nil? || @proxy_password.nil?
            req_opts[:proxyuserpwd] = "#{@proxy_username}:#{@proxy_password}"
        end
    end
end
append_blob(content, chunk, existing_content_length) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 515
def append_blob(content, chunk, existing_content_length)
  position = 0
  log.debug "azurestorage_gen2: append_blob.start: Content size: #{content.length}"
  loop do
    begin
      size = [content.length - position, AZURE_BLOCK_SIZE_LIMIT].min
      log.debug "azurestorage_gen2: append_blob.chunk: content[#{position}..#{position + size}]"
      append_blob_block(@azure_storage_path, content[position..position + size], existing_content_length)

      position += size
      existing_content_length += size
      break if position >= content.length
    rescue AppendBlobResponseError => ex
      status_code = ex.status_code
  
      if status_code == 409 # exceeds azure block limit
        @current_index += 1
        old_azure_storage_path = @azure_storage_path
        generate_log_name(metadata, @current_index)
  
        # If index is not a part of format, rethrow exception.
        if old_azure_storage_path == @azure_storage_path
          log.warn "azurestorage_gen2: append_blob: blocks limit reached, you need to use %{index} for the format."
          raise
        end
        flush(old_azure_storage_path, existing_content_length)
  
        log.info "azurestorage_gen2: append_blob: blocks limit reached, creating new blob #{@azure_storage_path}."
        create_blob(@azure_storage_path)
      elsif status_code == 404 # blob not found
        log.debug "azurestorage_gen2: append_blob: #{@azure_storage_path} blob doesn't exist, creating new blob."
        create_blob(@azure_storage_path)
      else
        raise
      end
    end
  end
  flush(@azure_storage_path, existing_content_length)
  log.debug "azurestorage_gen2: append_blob.complete"
end
append_blob_block(blob_path, content, position) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 424
def append_blob_block(blob_path, content, position)
    log.debug "azurestorage_gen2: append_blob_block.start: Append blob ('#{blob_path}') called with position #{position} (content length: #{content.length}, end position: #{position + content.length})"
    datestamp = create_request_date
    headers = {:"x-ms-version" =>  ABFS_API_VERSION,  :"x-ms-date" => datestamp, :"Content-Length" => content.length}
    params = {:action => "append", :position => "#{position}"}
    auth_header = create_auth_header("patch", datestamp, "#{@azure_container}#{blob_path}", headers, params)
    headers[:Authorization] = auth_header
    req_opts = {
        :method => :patch,
        :params => params,
        :headers => headers,
        :body => content
    }
    add_proxy_options(req_opts)
    request = Typhoeus::Request.new("https://#{azure_storage_account}#{@url_domain_suffix}/#{@azure_container}#{blob_path}", req_opts)
    request.on_complete do |response|
        if response.success?
            log.debug "azurestorage_gen2: Blob '#{blob_path}' has been appended, response code: #{response.code}"
        elsif response.timed_out?
            raise_error  "Appending blob #{blob_path}' request timed out."
        elsif response.code == 404
            raise AppendBlobResponseError.new("Blob '#{blob_path}' has not found. Error code: #{response.code}", 404)
        elsif response.code == 409
            raise AppendBlobResponseError.new("Blob '#{blob_path}' has conflict. Error code: #{response.code}", 409)
        else
            raise_error "Appending blob '#{blob_path}' request failed - code: #{response.code}, body: #{response.body}, headers: #{response.headers}"
        end
    end
    request.run
end
create_auth_header(method, datestamp, resource, headers, params) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 557
def create_auth_header(method, datestamp, resource, headers, params)
    if @azure_storage_access_key.nil?
        "Bearer #{@azure_access_token}"
    else
        "SharedKey #{@azure_storage_account}:#{signed(method, datestamp, resource, headers, params)}"
    end
end
create_blob(blob_path) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 396
def create_blob(blob_path)
    datestamp = create_request_date
    headers = {:"x-ms-version" =>  ABFS_API_VERSION, :"x-ms-date" => datestamp,:"Content-Length" => "0", :"Content-Type" => "text/plain"}
    params = {:resource => "file", :recursive => "false"}
    auth_header = create_auth_header("put", datestamp, "#{@azure_container}#{blob_path}", headers, params)
    headers[:Authorization] = auth_header
    req_opts = {
        :method => :put,
        :params => params,
        :headers => headers
    }
    add_proxy_options(req_opts)
    request = Typhoeus::Request.new("https://#{azure_storage_account}#{@url_domain_suffix}/#{@azure_container}#{blob_path}", req_opts)
    request.on_complete do |response|
        if response.success?
            log.debug "azurestorage_gen2: Blob '#{blob_path}' has been created, response code: #{response.code}"
        elsif response.timed_out?
            raise_error  "Creating blob '#{blob_path}' request timed out."
        elsif response.code == 409
            log.debug "azurestorage_gen2: Blob already exists: #{blob_path}"
        else
            raise_error "Creating blob '#{blob_path}' request failed - code: #{response.code}, body: #{response.body}, headers: #{response.headers}"
        end
    end
    request.run
end
create_container() click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 370
def create_container
    datestamp = create_request_date
    headers = {:"x-ms-version" =>  ABFS_API_VERSION, :"x-ms-date" => datestamp, :"Content-Length" => "0"}
    params = {:resource => "filesystem" }
    auth_header = create_auth_header("put", datestamp, "#{@azure_container}", headers, params)
    headers[:Authorization] = auth_header
    req_opts = {
        :method => :put,
        :params => params,
        :headers => headers
    }
    add_proxy_options(req_opts)
    request = Typhoeus::Request.new("https://#{azure_storage_account}#{@url_domain_suffix}/#{@azure_container}", req_opts)
    request.on_complete do |response|
        if response.success?
            log.debug "azurestorage_gen2: Container '#{@azure_container}' created, response code: #{response.code}"
        elsif response.timed_out?
            raise Fluent::UnrecoverableError, "Creating container '#{@azure_container}' request timed out."
        else
            raise Fluent::UnrecoverableError, "Creating container request failed - code: #{response.code}, body: #{response.body}, headers: #{response.headers}"
        end
    end
    request.run
end
create_request_date() click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 626
def create_request_date
    Time.now.strftime('%a, %e %b %y %H:%M:%S %Z')
end
ensure_container() click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 337
def ensure_container
    datestamp = create_request_date
    headers = {:"x-ms-version" =>  ABFS_API_VERSION, :"x-ms-date" => datestamp,:"Content-Length" => "0"}
    params = {:resource => "filesystem" }
    auth_header = create_auth_header("head", datestamp, "#{@azure_container}", headers, params)
    headers[:Authorization] = auth_header
    req_opts = {
        :method => :head,
        :params => params,
        :headers => headers
    }
    add_proxy_options(req_opts)
    request = Typhoeus::Request.new("https://#{azure_storage_account}#{@url_domain_suffix}/#{@azure_container}", req_opts)
    request.on_complete do |response|
        if response.success?
          log.info "azurestorage_gen2: Container '#{@azure_container}' exists."
        elsif response.timed_out?
            raise Fluent::UnrecoverableError,  "Get container '#{@azure_container}' request timed out."
        elsif response.code == 404
            log.info "azurestorage_gen2: Container '#{@azure_container}' does not exist. Creating it if needed..."
            if @auto_create_container
                create_container
            else
                raise Fluent::ConfigError, "The specified container does not exist: container = #{@azure_container}"
            end
        else
            raise Fluent::UnrecoverableError, "Get container request failed - code: #{response.code}, headers: #{response.headers}"
        end
    end
    request.run
end
flush(blob_path, position) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 456
def flush(blob_path, position)
    log.debug "azurestorage_gen2: flush_blob.start: Flush blob ('#{blob_path}') called with position #{position}"
    datestamp = create_request_date
    headers = {:"x-ms-version" => ABFS_API_VERSION, :"x-ms-date" => datestamp, :"Content-Length" => "0"}
    params = {:action => "flush", :position => "#{position}"}
    auth_header = create_auth_header("patch", datestamp, "#{@azure_container}#{blob_path}",headers, params)
    headers[:Authorization] = auth_header
    req_opts = {
        :method => :patch,
        :params => params,
        :headers => headers
    }
    add_proxy_options(req_opts)
    request = Typhoeus::Request.new("https://#{azure_storage_account}#{@url_domain_suffix}/#{@azure_container}#{blob_path}", req_opts)
    request.on_complete do |response|
        if response.success?
            log.debug "azurestorage_gen2: Blob '#{blob_path}' flush was successful, response code: #{response.code}"
        elsif response.timed_out?
            raise_error  "Bloub '#{blob_path}' flush request timed out."
        else
            raise_error "Blob flush request failed - code: #{response.code}, body: #{response.body}, headers: #{response.headers}"
        end
    end
    request.run
end
generate_log_name(chunk, index) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 189
def generate_log_name(chunk, index)
    metadata = chunk.metadata
    time_slice = if metadata.timekey.nil?
               ''.freeze
             else
               Time.at(metadata.timekey).utc.strftime(@time_slice_format)
             end
    if @localtime
        hms_slicer = Time.now.strftime("%H%M%S")
        upload_timestamp = Time.now.strftime(@upload_timestamp_format)
    else
        hms_slicer = Time.now.utc.strftime("%H%M%S")
        upload_timestamp = Time.now.utc.strftime(@upload_timestamp_format)
    end

    @values_for_object_chunk[chunk.unique_id] ||= {
        "%{hex_random}" => hex_random(chunk),
    }
    values_for_object_key_pre = {
        "%{path}" => @path,
        "%{index}" => index,
        "%{uuid_flush}" => uuid_random,
        "%{file_extension}" => @final_file_extension,
        "%{upload_timestamp}" => upload_timestamp,
    }
    values_for_object_key_post = {
        "%{date_slice}" => time_slice,
        "%{time_slice}" => time_slice,
        "%{hms_slice}" => hms_slicer,
    }.merge!(@values_for_object_chunk[chunk.unique_id])
    storage_path = @azure_object_key_format.gsub(%r(%{[^}]+})) do |matched_key|
        values_for_object_key_pre.fetch(matched_key, matched_key)
    end
    storage_path = extract_placeholders(storage_path, chunk)
    storage_path = storage_path.gsub(%r(%{[^}]+}), values_for_object_key_post)
    storage_path = "/" + storage_path unless storage_path.start_with?("/")
    @azure_storage_path = storage_path
end
get_blob_properties(blob_path) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 483
def get_blob_properties(blob_path)
    datestamp = create_request_date
    headers = {:"x-ms-version" =>  ABFS_API_VERSION, :"x-ms-date" => datestamp, :"Content-Length" => "0"}
    params = {}
    content_length = -1
    auth_header = create_auth_header("head", datestamp, "#{@azure_container}#{blob_path}", headers, params)
    headers[:Authorization] = auth_header
    req_opts = {
        :method => :head,
        :params => params,
        :headers => headers
    }
    add_proxy_options(req_opts)
    request = Typhoeus::Request.new("https://#{azure_storage_account}#{@url_domain_suffix}/#{@azure_container}#{blob_path}", req_opts)
    request.on_complete do |response|
        if response.success?
          log.debug "azurestorage_gen2: Get blob properties for '#{blob_path}', response headers: #{response.headers}"
          content_length = response.headers['Content-Length'].to_i
        elsif response.timed_out?
            raise_error  "Get blob properties '#{blob_path}' request timed out."
        elsif response.code == 404
            log.debug "azurestorage_gen2: Blob '#{blob_path}' does not exist. Creating it if needed..."
            content_length = 0
        else
            raise_error "Get blob properties '#{blob_path}' request failed - code: #{response.code}, body: #{response.body}, headers: #{response.headers}"
        end
    end
    request.run
    content_length
end
get_canonicalized_resource(resource, params) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 608
def get_canonicalized_resource(resource, params)
    if params.empty?
        canonicalized_resource="/#{@azure_storage_account}"
    else
        canonicalized_params = params
        .map{|paramKey, paramValue| "#{paramKey.to_s.downcase}:#{paramValue}"}
        .join("\n")
        canonicalized_resource="/#{@azure_storage_account}/#{resource}\n#{canonicalized_params}"
    end
end
hex_random(chunk) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 644
def hex_random(chunk)
    unique_hex = Fluent::UniqueId.hex(chunk.unique_id)
    unique_hex.reverse!
    unique_hex[0...@hex_random_length]
end
hex_to_bin(hex) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 620
def hex_to_bin(hex)
    hex = '0' << hex unless (hex.length % 2) == 0
    hex.scan(/[A-Fa-f0-9]{2}/).inject('') { |encoded, byte| encoded << [byte].pack('H*') }
end
raise_error(error_message) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 631
def raise_error(error_message)
    if @enable_retry
        raise BlobOperationError, error_message
    else
        raise Fluent::UnrecoverableError,  error_message
    end
end
setup_access_token() click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 228
def setup_access_token
    if @azure_storage_access_key.nil?
        @get_token_lock = Concurrent::ReadWriteLock.new
        if @startup_fail_on_error
            acquire_access_token
        else
            while true
                begin
                    acquire_access_token
                    break
                rescue Exception => e
                    log.warn("#{e.message}, acquired token failed, wait 20 seconds until next retry.")
                    sleep 20
                end
            end
        end
        if @azure_oauth_refresh_interval > 0
            log.info("azurestorage_gen2: Start getting access token every #{@azure_oauth_refresh_interval} seconds.")
            @get_token_task = Concurrent::TimerTask.new(
                execution_interval: @azure_oauth_refresh_interval) {
                begin
                    acquire_access_token
                rescue Exception => e
                    log.warn("#{e.message}, continue with previous credentials.")
                end
            }
            @get_token_task.execute
        end
    else
        log.info "azurestorage_gen2: Access storage key is configured, MSI support is disabled."
    end
end
sign_request(key, signable_string) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 582
def sign_request(key, signable_string)
    signed = OpenSSL::HMAC.digest('sha256', key, signable_string)
    Base64.strict_encode64(signed)
end
signable_string(method, resource, params, headers, datestamp) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 588
def signable_string(method, resource, params, headers, datestamp)
    [
      method.to_s.upcase,
      headers.fetch("Content-Encoding", ""),
      headers.fetch("Content-Language", ""),
      headers.fetch("Content-Length", "").sub(/^0+/, ""),
      headers.fetch("Content-MD5", ""),
      headers.fetch("Content-Type", ""),
      headers.fetch("Date", ""),
      headers.fetch("If-Modified-Since", ""),
      headers.fetch("If-Match", ""),
      headers.fetch("If-None-Match", ""),
      headers.fetch("If-Unmodified-Since", ""),
      headers.fetch("Range", ""),
      "x-ms-date:#{datestamp}\nx-ms-version:#{ABFS_API_VERSION}",
      get_canonicalized_resource(resource, params)
    ].join("\n")
end
signed(method, datestamp, resource, headers, params) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 576
def signed(method, datestamp, resource, headers, params)
    decoded_access_key=Base64.strict_decode64(@azure_storage_access_key).unpack("H*").first
    sign_request(decoded_access_key, signable_string(method, resource, params, headers, datestamp))
end
timekey_to_timeformat(timekey) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 650
def timekey_to_timeformat(timekey)
    case timekey
    when nil          then ''
    when 0...60       then '%Y%m%d%H%M%S' # 60 exclusive
    when 60...3600    then '%Y%m%d%H%M'
    when 3600...86400 then '%Y%m%d%H'
    else                   '%Y%m%d'
    end
end
upload_blob(content, chunk) click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 174
def upload_blob(content, chunk)
    log.debug "azurestorage_gen2: Uploading blob: #{@azure_storage_path}"
    if @write_only
        create_blob(@azure_storage_path)
        append_blob(content, chunk, 0)
    else
        existing_content_length = get_blob_properties(@azure_storage_path)
        if existing_content_length == 0
            create_blob(@azure_storage_path)
        end
        append_blob(content, chunk, existing_content_length)
    end
end
uuid_random() click to toggle source
# File lib/fluent/plugin/out_azurestorage_gen2.rb, line 639
def uuid_random
    require 'uuidtools'
    ::UUIDTools::UUID.random_create.to_s
end