class Fluent::Plugin::AzureStorageAppendBlobOut
Constants
- AZURE_BLOCK_SIZE_LIMIT
- DEFAULT_FORMAT_TYPE
Attributes
bs[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 22 def initialize super @use_msi = false # Storage endpoint suffixes for various environments, see https://github.com/Azure/go-autorest/blob/master/autorest/azure/environments.go @storage_endpoint_mapping = { 'AZURECHINACLOUD' => 'core.chinacloudapi.cn', 'AZUREGERMANCLOUD' => 'core.cloudapi.de', 'AZUREPUBLICCLOUD' => 'core.windows.net', 'AZUREUSGOVERNMENTCLOUD' => 'core.usgovcloudapi.net' } end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 64 def configure(conf) super @formatter = formatter_create @path_slicer = if @localtime proc do |path| Time.now.strftime(path) end else proc do |path| Time.now.utc.strftime(path) end end if @azure_cloud == 'AZURESTACKCLOUD' if @azure_storage_dns_suffix.nil? raise ConfigError 'azure_storage_dns_suffix invalid, must not be empty for AZURESTACKCLOUD' end else @azure_storage_dns_suffix = @storage_endpoint_mapping[@azure_cloud] if @azure_storage_dns_suffix.nil? raise ConfigError 'azure_cloud invalid, must be either of AZURECHINACLOUD, AZUREGERMANCLOUD, AZUREPUBLICCLOUD, AZUREUSGOVERNMENTCLOUD' end end if (@azure_storage_access_key.nil? || @azure_storage_access_key.empty?) && (@azure_storage_sas_token.nil? || @azure_storage_sas_token.empty?) && (@azure_storage_connection_string.nil? || @azure_storage_connection_string.empty?) log.info 'Using MSI since neither of azure_storage_access_key, azure_storage_sas_token, azure_storage_connection_string was provided.' @use_msi = true elsif @azure_storage_connection_string.nil? || @azure_storage_connection_string.empty? raise ConfigError, 'azure_storage_account needs to be specified' if @azure_storage_account.nil? raise ConfigError, 'azure_container needs to be specified' if @azure_container.nil? end @blob_options = {} if !@compute_checksums @blob_options[:content_md5] = '' end end
container_exists?(container)
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 198 def container_exists?(container) begin @bs.get_container_properties(container) rescue Azure::Core::Http::HTTPError => e if e.status_code == 404 # container does not exist return false else raise end end true end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 169 def format(tag, time, record) r = inject_values_to_record(tag, time, record) @formatter.format(tag, time, r) end
get_access_token()
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 111 def get_access_token access_key_request = Faraday.new('http://169.254.169.254/metadata/identity/oauth2/token?' \ "api-version=#{@azure_imds_api_version}" \ '&resource=https://storage.azure.com/' \ "#{!azure_msi_client_id.nil? ? "&client_id=#{azure_msi_client_id}" : ''}", headers: { 'Metadata' => 'true' }).get if access_key_request.status == 200 JSON.parse(access_key_request.body)['access_token'] else raise 'Access token request was not sucssessful. Possibly due to missing azure_msi_client_id config parameter.' end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 107 def multi_workers_ready? true end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 125 def start super if @use_msi token_credential = Azure::Storage::Common::Core::TokenCredential.new get_access_token token_signer = Azure::Storage::Common::Core::Auth::TokenSigner.new token_credential @bs = Azure::Storage::Blob::BlobService.new( storage_account_name: @azure_storage_account, storage_dns_suffix: @azure_storage_dns_suffix, signer: token_signer ) refresh_interval = @azure_token_refresh_interval * 60 cancelled = false renew_token = Thread.new do Thread.stop until cancelled sleep(refresh_interval) token_credential.renew_token get_access_token end end sleep 0.1 while renew_token.status != 'sleep' renew_token.run elsif !@azure_storage_connection_string.nil? && !@azure_storage_connection_string.empty? @bs = Azure::Storage::Blob::BlobService.create_from_connection_string(@azure_storage_connection_string) else @bs_params = { storage_account_name: @azure_storage_account, storage_dns_suffix: @azure_storage_dns_suffix } if !@azure_storage_access_key.nil? && !@azure_storage_access_key.empty? @bs_params.merge!({ storage_access_key: @azure_storage_access_key }) elsif !@azure_storage_sas_token.nil? && !@azure_storage_sas_token.empty? @bs_params.merge!({ storage_sas_token: @azure_storage_sas_token }) end @bs = Azure::Storage::Blob::BlobService.create(@bs_params) end ensure_container @azure_storage_path = '' @last_azure_storage_path = '' @current_index = 0 end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 174 def write(chunk) tmp = Tempfile.new('azure-') begin chunk.write_to(tmp) generate_log_name(chunk, @current_index) if @last_azure_storage_path != @azure_storage_path @current_index = 0 generate_log_name(chunk, @current_index) end content = File.open(tmp.path, 'rb', &:read) append_blob(content, chunk) @last_azure_storage_path = @azure_storage_path ensure begin tmp.close(true) rescue StandardError nil end end end
Private Instance Methods
append_blob(content, chunk)
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 241 def append_blob(content, chunk) position = 0 log.debug "azure_storage_append_blob: append_blob.start: Content size: #{content.length}" loop do size = [content.length - position, AZURE_BLOCK_SIZE_LIMIT].min log.debug "azure_storage_append_blob: append_blob.chunk: content[#{position}..#{position + size}]" @bs.append_blob_block(@azure_container, @azure_storage_path, content[position..position + size - 1], options=@blob_options) position += size break if position >= content.length rescue Azure::Core::Http::HTTPError => e status_code = e.status_code if status_code == 409 # exceeds azure block limit @current_index += 1 old_azure_storage_path = @azure_storage_path generate_log_name(chunk, @current_index) # If index is not a part of format, rethrow exception. if old_azure_storage_path == @azure_storage_path log.warn 'azure_storage_append_blob: append_blob: blocks limit reached, you need to use %{index} for the format.' raise end log.debug "azure_storage_append_blob: append_blob: blocks limit reached, creating new blob #{@azure_storage_path}." @bs.create_append_blob(@azure_container, @azure_storage_path) elsif status_code == 404 # blob not found log.debug "azure_storage_append_blob: append_blob: #{@azure_storage_path} blob doesn't exist, creating new blob." @bs.create_append_blob(@azure_container, @azure_storage_path) else raise end end log.debug 'azure_storage_append_blob: append_blob.complete' end
ensure_container()
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 213 def ensure_container unless container_exists? @azure_container if @auto_create_container @bs.create_container(@azure_container) else raise "The specified container does not exist: container = #{@azure_container}" end end end
generate_log_name(chunk, index)
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 223 def generate_log_name(chunk, index) metadata = chunk.metadata time_slice = if metadata.timekey.nil? ''.freeze else Time.at(metadata.timekey).utc.strftime(@time_slice_format) end path = @path_slicer.call(@path) values_for_object_key = { '%{path}' => path, '%{time_slice}' => time_slice, '%{index}' => index } storage_path = @azure_object_key_format.gsub(/%{[^}]+}/, values_for_object_key) @azure_storage_path = extract_placeholders(storage_path, chunk) end