class Fluent::Plugin::AzureStorageAppendBlobOut
Constants
- AZURE_BLOCK_SIZE_LIMIT
- DEFAULT_FORMAT_TYPE
Attributes
bs[R]
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 47 def configure(conf) super @formatter = formatter_create @path_slicer = if @localtime proc do |path| Time.now.strftime(path) end else proc do |path| Time.now.utc.strftime(path) end end raise ConfigError, 'azure_storage_account needs to be specified' if @azure_storage_account.nil? raise ConfigError, 'azure_container needs to be specified' if @azure_container.nil? if (@azure_storage_access_key.nil? || @azure_storage_access_key.empty?) && (@azure_storage_sas_token.nil? || @azure_storage_sas_token.empty?) log.info 'Using MSI since neither azure_storage_access_key nor azure_storage_sas_token was provided.' @use_msi = true end end
container_exists?(container)
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 157 def container_exists?(container) begin @bs.get_container_properties(container) rescue Azure::Core::Http::HTTPError => ex if ex.status_code == 404 # container does not exist return false else raise end end return true end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 123 def format(tag, time, record) r = inject_values_to_record(tag, time, record) @formatter.format(tag, time, r) end
get_access_token()
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 76 def get_access_token access_key_request = Faraday.new('http://169.254.169.254/metadata/identity/oauth2/token?' \ "api-version=#{@azure_imds_api_version}" \ '&resource=https://storage.azure.com/', headers: { 'Metadata' => 'true' }) .get .body JSON.parse(access_key_request)['access_token'] end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 72 def multi_workers_ready? true end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 86 def start super if @use_msi token_credential = Azure::Storage::Common::Core::TokenCredential.new get_access_token token_signer = Azure::Storage::Common::Core::Auth::TokenSigner.new token_credential @bs = Azure::Storage::Blob::BlobService.new(storage_account_name: @azure_storage_account, signer: token_signer) refresh_interval = @azure_token_refresh_interval * 60 cancelled = false renew_token = Thread.new do Thread.stop until cancelled sleep(refresh_interval) token_credential.renew_token get_access_token end end sleep 0.1 while renew_token.status != 'sleep' renew_token.run else @bs_params = { storage_account_name: @azure_storage_account } if !@azure_storage_access_key.nil? && !@azure_storage_access_key.empty? @bs_params.merge!({ storage_access_key: @azure_storage_access_key }) elsif !@azure_storage_sas_token.nil? && !@azure_storage_sas_token.empty? @bs_params.merge!({ storage_sas_token: @azure_storage_sas_token }) end @bs = Azure::Storage::Blob::BlobService.create(@bs_params) end ensure_container @azure_storage_path = '' @last_azure_storage_path = '' @current_index = 0 end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 128 def write(chunk) metadata = chunk.metadata tmp = Tempfile.new('azure-') begin chunk.write_to(tmp) generate_log_name(metadata, @current_index) if @last_azure_storage_path != @azure_storage_path @current_index = 0 generate_log_name(metadata, @current_index) end content = File.open(tmp.path, 'rb', &:read) if content.length > 0 log.debug "azure_storage_append_blob: append_blob: prepare for writing #{@azure_storage_path} blob." append_blob(content, metadata) end @last_azure_storage_path = @azure_storage_path ensure begin tmp.close(true) rescue StandardError nil end end end
Private Instance Methods
append_blob(content, metadata)
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 203 def append_blob(content, metadata) position = 0 log.debug "azure_storage_append_blob: append_blob.start: Content size: #{content.length}" loop do begin size = [content.length - position, AZURE_BLOCK_SIZE_LIMIT].min log.debug "azure_storage_append_blob: append_blob.chunk: content[#{position}..#{position + size}]" @bs.append_blob_block(@azure_container, @azure_storage_path, content[position..position + size]) position += size break if position >= content.length rescue Azure::Core::Http::HTTPError => e status_code = e.status_code if status_code == 409 # exceeds azure block limit @current_index += 1 old_azure_storage_path = @azure_storage_path generate_log_name(metadata, @current_index) # If index is not a part of format, rethrow exception. if old_azure_storage_path == @azure_storage_path log.warn 'azure_storage_append_blob: append_blob: blocks limit reached, you need to use %{index} for the format.' raise end log.debug "azure_storage_append_blob: append_blob: blocks limit reached, creating new blob #{@azure_storage_path}." @bs.create_append_blob(@azure_container, @azure_storage_path) elsif status_code == 404 # blob not found log.debug "azure_storage_append_blob: append_blob: #{@azure_storage_path} blob doesn't exist, creating new blob." @bs.create_append_blob(@azure_container, @azure_storage_path) else raise end end end log.debug 'azure_storage_append_blob: append_blob.complete' end
ensure_container()
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 172 def ensure_container unless container_exists? @azure_container if @auto_create_container @bs.create_container(@azure_container) else raise "The specified container does not exist: container = #{@azure_container}" end end end
generate_log_name(metadata, index)
click to toggle source
# File lib/fluent/plugin/out_azure-storage-append-blob.rb, line 184 def generate_log_name(metadata, index) time_slice = if metadata.timekey.nil? ''.freeze else Time.at(metadata.timekey).utc.strftime(@time_slice_format) end path = @path_slicer.call(@path) values_for_object_key = { '%{path}' => path, '%{time_slice}' => time_slice, '%{index}' => index } storage_path = @azure_object_key_format.gsub(/%{[^}]+}/, values_for_object_key) @azure_storage_path = extract_placeholders(storage_path, metadata) end