class Fluent::Plugin::AzureStorageAppendBlobOut

Constants

AZURE_BLOCK_SIZE_LIMIT
DEFAULT_FORMAT_TYPE

Attributes

bs[R]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 47
def configure(conf)
  super

  @formatter = formatter_create

  @path_slicer = if @localtime
                   proc do |path|
                     Time.now.strftime(path)
                   end
                 else
                   proc do |path|
                     Time.now.utc.strftime(path)
                   end
                 end

  raise ConfigError, 'azure_storage_account needs to be specified' if @azure_storage_account.nil?

  raise ConfigError, 'azure_container needs to be specified' if @azure_container.nil?

  if (@azure_storage_access_key.nil? || @azure_storage_access_key.empty?) && (@azure_storage_sas_token.nil? || @azure_storage_sas_token.empty?)
    log.info 'Using MSI since neither azure_storage_access_key nor azure_storage_sas_token was provided.'
    @use_msi = true
  end
end
container_exists?(container) click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 153
def container_exists?(container)
  begin
    @bs.get_container_properties(container)
  rescue Azure::Core::Http::HTTPError => ex
    if ex.status_code == 404 # container does not exist
      return false
    else
      raise
    end
  end
  return true
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 123
def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end
get_access_token() click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 76
def get_access_token
  access_key_request = Faraday.new('http://169.254.169.254/metadata/identity/oauth2/token?' \
                                   "api-version=#{@azure_imds_api_version}" \
                                   '&resource=https://storage.azure.com/',
                                   headers: { 'Metadata' => 'true' })
                              .get
                              .body
  JSON.parse(access_key_request)['access_token']
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 72
def multi_workers_ready?
  true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 86
def start
  super
  if @use_msi
    token_credential = Azure::Storage::Common::Core::TokenCredential.new get_access_token
    token_signer = Azure::Storage::Common::Core::Auth::TokenSigner.new token_credential
    @bs = Azure::Storage::Blob::BlobService.new(storage_account_name: @azure_storage_account, signer: token_signer)

    refresh_interval = @azure_token_refresh_interval * 60
    cancelled = false
    renew_token = Thread.new do
      Thread.stop
      until cancelled
        sleep(refresh_interval)

        token_credential.renew_token get_access_token
      end
    end
    sleep 0.1 while renew_token.status != 'sleep'
    renew_token.run
  else
    @bs_params = { storage_account_name: @azure_storage_account }

    if !@azure_storage_access_key.nil? && !@azure_storage_access_key.empty?
      @bs_params.merge!({ storage_access_key: @azure_storage_access_key })
    elsif !@azure_storage_sas_token.nil? && !@azure_storage_sas_token.empty?
      @bs_params.merge!({ storage_sas_token: @azure_storage_sas_token })
    end

    @bs = Azure::Storage::Blob::BlobService.create(@bs_params)
  end

  ensure_container
  @azure_storage_path = ''
  @last_azure_storage_path = ''
  @current_index = 0
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 128
def write(chunk)
  metadata = chunk.metadata
  tmp = Tempfile.new('azure-')
  begin
    chunk.write_to(tmp)

    generate_log_name(metadata, @current_index)
    if @last_azure_storage_path != @azure_storage_path
      @current_index = 0
      generate_log_name(metadata, @current_index)
    end

    content = File.open(tmp.path, 'rb', &:read)

    append_blob(content, metadata)
    @last_azure_storage_path = @azure_storage_path
  ensure
    begin
      tmp.close(true)
    rescue StandardError
      nil
    end
  end
end

Private Instance Methods

append_blob(content, metadata) click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 199
def append_blob(content, metadata)
  position = 0
  log.debug "azure_storage_append_blob: append_blob.start: Content size: #{content.length}"
  loop do
    begin
      size = [content.length - position, AZURE_BLOCK_SIZE_LIMIT].min
      log.debug "azure_storage_append_blob: append_blob.chunk: content[#{position}..#{position + size}]"
      @bs.append_blob_block(@azure_container, @azure_storage_path, content[position..position + size])
      position += size
      break if position >= content.length
    rescue Azure::Core::Http::HTTPError => e
      status_code = e.status_code

      if status_code == 409 # exceeds azure block limit
        @current_index += 1
        old_azure_storage_path = @azure_storage_path
        generate_log_name(metadata, @current_index)

        # If index is not a part of format, rethrow exception.
        if old_azure_storage_path == @azure_storage_path
          log.warn 'azure_storage_append_blob: append_blob: blocks limit reached, you need to use %{index} for the format.'
          raise
        end

        log.debug "azure_storage_append_blob: append_blob: blocks limit reached, creating new blob #{@azure_storage_path}."
        @bs.create_append_blob(@azure_container, @azure_storage_path)
      elsif status_code == 404 # blob not found
        log.debug "azure_storage_append_blob: append_blob: #{@azure_storage_path} blob doesn't exist, creating new blob."
        @bs.create_append_blob(@azure_container, @azure_storage_path)
      else
        raise
      end
    end
  end
  log.debug 'azure_storage_append_blob: append_blob.complete'
end
ensure_container() click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 168
def ensure_container
  unless container_exists? @azure_container
    if @auto_create_container
      @bs.create_container(@azure_container)
    else
      raise "The specified container does not exist: container = #{@azure_container}"
    end
  end
end
generate_log_name(metadata, index) click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 180
def generate_log_name(metadata, index)
  time_slice = if metadata.timekey.nil?
                 ''.freeze
               else
                 Time.at(metadata.timekey).utc.strftime(@time_slice_format)
    end

  path = @path_slicer.call(@path)
  values_for_object_key = {
    '%{path}' => path,
    '%{time_slice}' => time_slice,
    '%{index}' => index
  }
  storage_path = @azure_object_key_format.gsub(/%{[^}]+}/, values_for_object_key)
  @azure_storage_path = extract_placeholders(storage_path, metadata)
end