class Fluent::Plugin::AzureStorageOutput

Constants

COMPRESSOR_REGISTRY
DEFAULT_FORMAT_TYPE

Attributes

bs[R]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azurestorage.rb, line 14
def initialize
  super

  @compressor = nil
end

Private Class Methods

register_compressor(name, compressor) click to toggle source
# File lib/fluent/plugin/out_azurestorage.rb, line 269
def self.register_compressor(name, compressor)
  COMPRESSOR_REGISTRY.register(name, compressor)
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azurestorage.rb, line 44
def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter, :inject)
  super

  begin
    @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(:buffer_type => @buffer_type, :log => log)
  rescue => e
    log.warn "#{@store_as} not found. Use 'text' instead"
    @compressor = TextCompressor.new
  end
  @compressor.configure(conf)

  @formatter = formatter_create

  if @localtime
    @path_slicer = Proc.new {|path|
      Time.now.strftime(path)
    }
  else
    @path_slicer = Proc.new {|path|
      Time.now.utc.strftime(path)
    }
  end

  if @azure_container.nil?
    raise ConfigError, 'azure_container is needed'
  end

  @storage_type = case @azure_storage_type
                    when 'tables'
                      raise NotImplementedError
                    when 'queues'
                      raise NotImplementedError
                    else
                      'blob'
                  end
  # For backward compatibility
  # TODO: Remove time_slice_format when end of support compat_parameters
  @configured_time_slice_format = conf['time_slice_format']
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_azurestorage.rb, line 104
def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_azurestorage.rb, line 85
def multi_workers_ready?
  true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_azurestorage.rb, line 89
def start
  super

  if (!@azure_storage_account.nil? && !@azure_storage_access_key.nil?)
    Azure.configure do |config|
      config.storage_account_name = @azure_storage_account
      config.storage_access_key   = @azure_storage_access_key
    end
  end
  @bs = Azure::Blob::BlobService.new
  @bs.extend UploadService

  ensure_container
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_azurestorage.rb, line 109
def write(chunk)
  i = 0
  metadata = chunk.metadata
  previous_path = nil
  time_slice_format = @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey'])
  time_slice = if metadata.timekey.nil?
                 ''.freeze
               else
                 Time.at(metadata.timekey).utc.strftime(time_slice_format)
               end

  begin
    path = @path_slicer.call(@path)
    values_for_object_key = {
      "%{path}" => path,
      "%{time_slice}" => time_slice,
      "%{file_extension}" => @compressor.ext,
      "%{index}" => i,
      "%{uuid_flush}" => uuid_random
    }
    storage_path = @azure_object_key_format.gsub(%r(%{[^}]+}), values_for_object_key)
    storage_path = extract_placeholders(storage_path, metadata)
    if (i > 0) && (storage_path == previous_path)
      raise "duplicated path is generated. use %{index} in azure_object_key_format: path = #{storage_path}"
    end

    i += 1
    previous_path = storage_path
  end while blob_exists?(@azure_container, storage_path)

  tmp = Tempfile.new("azure-")
  begin
    @compressor.compress(chunk, tmp)
    tmp.close

    options = {}
    options[:content_type] = @compressor.content_type
    options[:container] = @azure_container
    options[:blob] = storage_path

    @bs.upload(tmp.path, options)
  end
end

Private Instance Methods

blob_exists?(container, blob) click to toggle source
# File lib/fluent/plugin/out_azurestorage.rb, line 273
def blob_exists?(container, blob)
  begin
    @bs.get_blob_properties(container, blob)
    true
  rescue Azure::Core::Http::HTTPError => ex
    raise if ex.status_code != 404
    false
  rescue Exception => e
    raise e.message
  end
end
ensure_container() click to toggle source
# File lib/fluent/plugin/out_azurestorage.rb, line 154
def ensure_container
  if ! @bs.list_containers.find { |c| c.name == @azure_container }
    if @auto_create_container
      @bs.create_container(@azure_container)
    else
      raise "The specified container does not exist: container = #{@azure_container}"
    end
  end
end
timekey_to_timeformat(timekey) click to toggle source

This is stolen from Fluentd

# File lib/fluent/plugin/out_azurestorage.rb, line 170
def timekey_to_timeformat(timekey)
  case timekey
  when nil          then ''
  when 0...60       then '%Y%m%d%H%M%S' # 60 exclusive
  when 60...3600    then '%Y%m%d%H%M'
  when 3600...86400 then '%Y%m%d%H'
  else                   '%Y%m%d'
  end
end
uuid_random() click to toggle source
# File lib/fluent/plugin/out_azurestorage.rb, line 164
def uuid_random
  require 'uuidtools'
  ::UUIDTools::UUID.random_create.to_s
end