class LogStash::Outputs::Qingstor::FileRepository

Constants

DEFAULT_STATE_SWEEPER_INTERVAL_SECS
DEFAULT_STATE_TIME_SECS

Public Class Methods

new(tags, encoding, temporary_directory, stale_time = DEFAULT_STATE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) click to toggle source
# File lib/logstash/outputs/qingstor/file_repository.rb, line 63
def initialize(tags, encoding, temporary_directory,
               stale_time = DEFAULT_STATE_TIME_SECS,
               sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
  @prefixed_factories = ConcurrentHashMap.new
  @sweeper_interval = sweeper_interval
  @factoryinitializer = FactoryInitializer.new(tags,
                                               encoding,
                                               temporary_directory,
                                               stale_time)

  start_stale_sweeper
end

Public Instance Methods

each_files() { |current| ... } click to toggle source
# File lib/logstash/outputs/qingstor/file_repository.rb, line 80
def each_files
  @prefixed_factories.elements.each do |prefixed_file|
    prefixed_file.with_lock { |factory| yield factory.current }
  end
end
get_factory(prefix_key) { |factory| ... } click to toggle source
# File lib/logstash/outputs/qingstor/file_repository.rb, line 86
def get_factory(prefix_key)
  @prefixed_factories.computeIfAbsent(prefix_key, @factoryinitializer)
                     .with_lock { |factory| yield factory }
end
get_file(prefix_key) { |current| ... } click to toggle source
# File lib/logstash/outputs/qingstor/file_repository.rb, line 91
def get_file(prefix_key)
  get_factory(prefix_key) { |factory| yield factory.current }
end
keys() click to toggle source
# File lib/logstash/outputs/qingstor/file_repository.rb, line 76
def keys
  @prefixed_factories.keySet
end
remove_stale(k, v) click to toggle source
# File lib/logstash/outputs/qingstor/file_repository.rb, line 103
def remove_stale(k, v)
  if v.stale?
    @prefixed_factories.remove(k, v)
    v.delete!
  end
end
shutdown() click to toggle source
# File lib/logstash/outputs/qingstor/file_repository.rb, line 95
def shutdown
  stop_stale_sweeper
end
size() click to toggle source
# File lib/logstash/outputs/qingstor/file_repository.rb, line 99
def size
  @prefixed_factories.size
end
start_stale_sweeper() click to toggle source
# File lib/logstash/outputs/qingstor/file_repository.rb, line 110
def start_stale_sweeper
  @stale_sweeper = Concurrent::TimerTask.new(
    :execution_interval => @sweeper_interval
  ) do
    LogStash::Util.set_thread_name('Qingstor, stale factory sweeper')
    @prefixed_factories.forEach { |k, v| remove_stale(k, v) }
  end

  @stale_sweeper.execute
end
stop_stale_sweeper() click to toggle source
# File lib/logstash/outputs/qingstor/file_repository.rb, line 121
def stop_stale_sweeper
  @stale_sweeper.shutdown
end