class Fluent::Plugin::AzureStorageOutput
Constants
- COMPRESSOR_REGISTRY
- DEFAULT_FORMAT_TYPE
Attributes
bs[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azurestorage.rb, line 14 def initialize super @compressor = nil end
Private Class Methods
register_compressor(name, compressor)
click to toggle source
# File lib/fluent/plugin/out_azurestorage.rb, line 269 def self.register_compressor(name, compressor) COMPRESSOR_REGISTRY.register(name, compressor) end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azurestorage.rb, line 44 def configure(conf) compat_parameters_convert(conf, :buffer, :formatter, :inject) super begin @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(:buffer_type => @buffer_type, :log => log) rescue => e log.warn "#{@store_as} not found. Use 'text' instead" @compressor = TextCompressor.new end @compressor.configure(conf) @formatter = formatter_create if @localtime @path_slicer = Proc.new {|path| Time.now.strftime(path) } else @path_slicer = Proc.new {|path| Time.now.utc.strftime(path) } end if @azure_container.nil? raise ConfigError, 'azure_container is needed' end @storage_type = case @azure_storage_type when 'tables' raise NotImplementedError when 'queues' raise NotImplementedError else 'blob' end # For backward compatibility # TODO: Remove time_slice_format when end of support compat_parameters @configured_time_slice_format = conf['time_slice_format'] end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_azurestorage.rb, line 104 def format(tag, time, record) r = inject_values_to_record(tag, time, record) @formatter.format(tag, time, r) end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_azurestorage.rb, line 85 def multi_workers_ready? true end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azurestorage.rb, line 89 def start super if (!@azure_storage_account.nil? && !@azure_storage_access_key.nil?) Azure.configure do |config| config.storage_account_name = @azure_storage_account config.storage_access_key = @azure_storage_access_key end end @bs = Azure::Blob::BlobService.new @bs.extend UploadService ensure_container end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_azurestorage.rb, line 109 def write(chunk) i = 0 metadata = chunk.metadata previous_path = nil time_slice_format = @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']) time_slice = if metadata.timekey.nil? ''.freeze else Time.at(metadata.timekey).utc.strftime(time_slice_format) end begin path = @path_slicer.call(@path) values_for_object_key = { "%{path}" => path, "%{time_slice}" => time_slice, "%{file_extension}" => @compressor.ext, "%{index}" => i, "%{uuid_flush}" => uuid_random } storage_path = @azure_object_key_format.gsub(%r(%{[^}]+}), values_for_object_key) storage_path = extract_placeholders(storage_path, metadata) if (i > 0) && (storage_path == previous_path) raise "duplicated path is generated. use %{index} in azure_object_key_format: path = #{storage_path}" end i += 1 previous_path = storage_path end while blob_exists?(@azure_container, storage_path) tmp = Tempfile.new("azure-") begin @compressor.compress(chunk, tmp) tmp.close options = {} options[:content_type] = @compressor.content_type options[:container] = @azure_container options[:blob] = storage_path @bs.upload(tmp.path, options) end end
Private Instance Methods
blob_exists?(container, blob)
click to toggle source
# File lib/fluent/plugin/out_azurestorage.rb, line 273 def blob_exists?(container, blob) begin @bs.get_blob_properties(container, blob) true rescue Azure::Core::Http::HTTPError => ex raise if ex.status_code != 404 false rescue Exception => e raise e.message end end
ensure_container()
click to toggle source
# File lib/fluent/plugin/out_azurestorage.rb, line 154 def ensure_container if ! @bs.list_containers.find { |c| c.name == @azure_container } if @auto_create_container @bs.create_container(@azure_container) else raise "The specified container does not exist: container = #{@azure_container}" end end end
timekey_to_timeformat(timekey)
click to toggle source
This is stolen from Fluentd
# File lib/fluent/plugin/out_azurestorage.rb, line 170 def timekey_to_timeformat(timekey) case timekey when nil then '' when 0...60 then '%Y%m%d%H%M%S' # 60 exclusive when 60...3600 then '%Y%m%d%H%M' when 3600...86400 then '%Y%m%d%H' else '%Y%m%d' end end
uuid_random()
click to toggle source
# File lib/fluent/plugin/out_azurestorage.rb, line 164 def uuid_random require 'uuidtools' ::UUIDTools::UUID.random_create.to_s end