class Fluent::Plugin::FileWithHeaderOutput

Constants

DEFAULT_TIMEKEY
SUPPORTED_COMPRESS
SUPPORTED_COMPRESS_MAP

Attributes

dir_perm[R]
last_written_path[RW]

Public Instance Methods

compression_suffix(compress) click to toggle source
# File lib/fluent/plugin/out_file_with_header.rb, line 266
def compression_suffix(compress)
  case compress
  when :gzip then '.gz'
  when nil then ''
  else
    raise ArgumentError, "unknown compression type #{compress}"
  end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_file_with_header.rb, line 105
def configure(conf)
  compat_parameters_convert(conf, :formatter, :buffer, :inject, default_chunk_key: "time")

  configured_time_slice_format = conf['time_slice_format']

  if conf.elements(name: 'buffer').empty?
    conf.add_element('buffer', 'time')
  end
  buffer_conf = conf.elements(name: 'buffer').first
  # Fluent::PluginId#configure is not called yet, so we can't use #plugin_root_dir here.
  if !buffer_conf.has_key?('path') && !(conf['@id'] && system_config.root_dir)
    # v0.14 file buffer handles path as directory if '*' is missing
    # 'dummy_path' is not to raise configuration error for 'path' in file buffer plugin,
    # but raise it in this plugin.
    buffer_conf['path'] = conf['path'] || '/tmp/dummy_path'
  end

  if conf.has_key?('utc') || conf.has_key?('localtime')
    param_name = conf.has_key?('utc') ? 'utc' : 'localtime'
    log.warn "'#{param_name}' is deprecated for output plugin. This parameter is used for formatter plugin in compatibility layer. If you want to use same feature, use timekey_use_utc parameter in <buffer> directive instead"
  end

  super

  @compress_method = SUPPORTED_COMPRESS_MAP[@compress]

  if @path.include?('*') && !@buffer_config.timekey
    raise Fluent::ConfigError, "path including '*' must be used with buffer chunk key 'time'"
  end

  path_suffix = @add_path_suffix ? @path_suffix : ''
  path_timekey = if @chunk_key_time
                   @as_secondary ? @primary_instance.buffer_config.timekey : @buffer_config.timekey
                 else
                   nil
                 end
  @path_template = generate_path_template(@path, path_timekey, @append, @compress_method, path_suffix: path_suffix, time_slice_format: configured_time_slice_format)

  if @as_secondary
    # When this plugin is configured as secondary & primary plugin has tag key, but this plugin may not have it.
    # Increment placeholder can make another output file per chunk tag/keys even if original path doesn't include it.
    placeholder_validators(:path, @path_template).select{|v| v.type == :time }.each do |v|
      v.validate!
    end
  else
    placeholder_validate!(:path, @path_template)

    max_tag_index = get_placeholders_tag(@path_template).max || 1
    max_tag_index = 1 if max_tag_index < 1
    dummy_tag = (['a'] * max_tag_index).join('.')
    dummy_record_keys = get_placeholders_keys(@path_template) || ['message']
    dummy_record = Hash[dummy_record_keys.zip(['data'] * dummy_record_keys.size)]

    test_chunk1 = chunk_for_test(dummy_tag, Fluent::EventTime.now, dummy_record)
    test_path = extract_placeholders(@path_template, test_chunk1)
    unless ::Fluent::FileUtil.writable_p?(test_path)
      raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable"
    end
  end

  @formatter = formatter_create

  if @symlink_path && @buffer.respond_to?(:path)
    if @as_secondary
      raise Fluent::ConfigError, "symlink_path option is unavailable in <secondary>: consider to use secondary_file plugin"
    end
    if Fluent.windows?
      log.warn "symlink_path is unavailable on Windows platform. disabled."
      @symlink_path = nil
    else
      @buffer.extend SymlinkBufferMixin
      @buffer.symlink_path = @symlink_path
      @buffer.output_plugin_for_symlink = self
    end
  end

  @dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION
  @file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION
  @need_lock = system_config.workers > 1
end
find_filepath_available(path_with_placeholder, with_lock: false) { |path| ... } click to toggle source
# File lib/fluent/plugin/out_file_with_header.rb, line 307
def find_filepath_available(path_with_placeholder, with_lock: false) # for non-append
  raise "BUG: index placeholder not found in path: #{path_with_placeholder}" unless path_with_placeholder.index('_**')
  i = 0
  dir_path = locked = nil
  while true
    path = path_with_placeholder.sub('_**', "_#{i}")
    i += 1
    next if File.exist?(path)

    if with_lock
      dir_path = path + '.lock'
      locked = Dir.mkdir(dir_path) rescue false
      next unless locked
      # ensure that other worker doesn't create a file (and release lock)
      # between previous File.exist? and Dir.mkdir
      next if File.exist?(path)
    end

    break
  end
  yield path
ensure
  if dir_path && locked && Dir.exist?(dir_path)
    Dir.rmdir(dir_path) rescue nil
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_file_with_header.rb, line 190
def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end
generate_path_template(original, timekey, append, compress, path_suffix: '', time_slice_format: nil) click to toggle source

/path/to/dir/file.* -> /path/to/dir/file.%Y%m%d /path/to/dir/file.*.data -> /path/to/dir/file.%Y%m%d.data /path/to/dir/file -> /path/to/dir/file.%Y%m%d.log

%Y%m%d -> %Y%m%d_** (non append)

+ .gz (gzipped)

TODO: remove time_slice_format when end of support of compat_parameters
# File lib/fluent/plugin/out_file_with_header.rb, line 281
def generate_path_template(original, timekey, append, compress, path_suffix: '', time_slice_format: nil)
  comp_suffix = compression_suffix(compress)
  index_placeholder = append ? '' : '_**'
  if original.index('*')
    raise "BUG: configuration error must be raised for path including '*' without timekey" unless timekey
    time_placeholders_part = time_slice_format || timekey_to_timeformat(timekey)
    original.gsub('*', time_placeholders_part + index_placeholder) + comp_suffix
  else
    if timekey
      if time_slice_format
        "#{original}.#{time_slice_format}#{index_placeholder}#{path_suffix}#{comp_suffix}"
      else
        time_placeholders = timekey_to_timeformat(timekey)
        if time_placeholders.scan(/../).any?{|ph| original.include?(ph) }
          raise Fluent::ConfigError, "insufficient timestamp placeholders in path" if time_placeholders.scan(/../).any?{|ph| !original.include?(ph) }
          "#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}"
        else
          "#{original}.#{time_placeholders}#{index_placeholder}#{path_suffix}#{comp_suffix}"
        end
      end
    else
      "#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}"
    end
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_file_with_header.rb, line 186
def multi_workers_ready?
  true
end
timekey_to_timeformat(timekey) click to toggle source
# File lib/fluent/plugin/out_file_with_header.rb, line 256
def timekey_to_timeformat(timekey)
  case timekey
  when nil          then ''
  when 0...60       then '%Y%m%d%H%M%S' # 60 exclusive
  when 60...3600    then '%Y%m%d%H%M'
  when 3600...86400 then '%Y%m%d%H'
  else                   '%Y%m%d'
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_file_with_header.rb, line 195
def write(chunk)
  path = extract_placeholders(@path_template, chunk)
  FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

  writer = case
           when @compress_method.nil?
             method(:write_without_compression)
           when @compress_method == :gzip
             if @buffer.compress != :gzip || @recompress
               method(:write_gzip_with_compression)
             else
               method(:write_gzip_from_gzipped_chunk)
             end
           else
             raise "BUG: unknown compression method #{@compress_method}"
           end

  if @append
    writer.call(path, chunk)
  else
    find_filepath_available(path, with_lock: @need_lock) do |actual_path|
      writer.call(actual_path, chunk)
      path = actual_path
    end
  end

  @last_written_path = path
end
write_gzip_from_gzipped_chunk(path, chunk) click to toggle source
# File lib/fluent/plugin/out_file_with_header.rb, line 246
def write_gzip_from_gzipped_chunk(path, chunk)
  if f.stat.size == 0
    f.write(headers)
    f.write("\n")
  end
  File.open(path, "ab", @file_perm) do |f|
    chunk.write_to(f, compressed: :gzip)
  end
end
write_gzip_with_compression(path, chunk) click to toggle source
# File lib/fluent/plugin/out_file_with_header.rb, line 234
def write_gzip_with_compression(path, chunk)
  File.open(path, "ab", @file_perm) do |f|
    gz = Zlib::GzipWriter.new(f)
    if f.stat.size == 0
      f.write(headers)
      f.write("\n")
    end
    chunk.write_to(gz, compressed: :text)
    gz.close
  end
end
write_without_compression(path, chunk) click to toggle source
# File lib/fluent/plugin/out_file_with_header.rb, line 224
def write_without_compression(path, chunk)
  File.open(path, "ab", @file_perm) do |f|
    if f.stat.size == 0
      f.write(headers)
      f.write("\n")
    end
    chunk.write_to(f)
  end
end