class LogStash::Outputs::Qingstor
Constants
- CRASH_RECOVERY_THREADPOOL
- PERIODIC_CHECK_INTERVAL_IN_SECONDS
Public Instance Methods
clean_temporary_file(file)
click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 224 def clean_temporary_file(file) @logger.info('Callback: removing temporary file', :file => file.path) file.delete! end
close()
click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 196 def close stop_periodic_check if @rotation.needs_periodic? @logger.info('uploading current workspace before closing') @file_repository.each_files do |file| upload_file(file) if file.size > 0 end @file_repository.shutdown @uploader.stop @crash_uploader.stop if @restore end
directory_valid?(path)
click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 247 def directory_valid?(path) FileUtils.mkdir_p(path) unless Dir.exist?(path) ::File.writable?(path) rescue false end
getbucket()
click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 189 def getbucket @qs_config = QingStor::SDK::Config.init @access_key_id, @secret_access_key @qs_config.update(:host => @host, :port => @port) unless @host.nil? @qs_service = QingStor::SDK::Service.new @qs_config @qs_service.bucket @bucket, @region end
log_print_config()
click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 131 def log_print_config @logger.info('Run at setting: ', :prefix => @prefix, :tmpdir => @tmpdir, :rotation => @rotation.to_s, :tags => @tags, :encoding => @encoding, :restore => @restore) end
multi_receive_encoded(events_and_encoded)
click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 140 def multi_receive_encoded(events_and_encoded) prefix_written_to = Set.new events_and_encoded.each do |event, encoded| prefix_key = event.sprintf(@prefix) prefix_written_to << prefix_key begin @file_repository.get_file(prefix_key) do |f| content = encoded.strip + "\n" f.write(content) end rescue Errno::ENOSPC => e @logger.error('QingStor: Nospace left in temporary directory', :tmpdir => @tmpdir) raise e end end # end of each method # check the file after file writing # Groups IO calls to optimize fstat checks rotate_if_needed(prefix_written_to) end
register()
click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 106 def register QingstorValidator.prefix_valid?(@prefix) unless @prefix.nil? unless directory_valid?(@tmpdir) raise LogStash::ConfigurationError, "Logstash must have the permissions to write to: #{@tmpdir}" end @file_repository = FileRepository.new(@tags, @encoding, @tmpdir) @rotation = RotationPolicy.new(@rotation_strategy, @file_size, @file_time) executor = Concurrent::ThreadPoolExecutor.new( :min_threads => 1, :max_threads => @upload_workers_count, :max_queue => @upload_queue_size, :fallback_policy => :caller_runs ) @qs_bucket = getbucket QingstorValidator.bucket_valid?(@qs_bucket) @uploader = Uploader.new(@qs_bucket, @logger, executor) log_print_config start_periodic_check if @rotation.needs_periodic? restore_from_crash if @restore end
restore_from_crash()
click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 254 def restore_from_crash @crash_uploader = Uploader.new(@qs_bucket, @logger, CRASH_RECOVERY_THREADPOOL) temp_folder_path = Pathname.new(@tmpdir) Dir.glob(::File.join(@tmpdir, '**/*')) .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) # Now multipart uploader supports file size up to 500GB if temp_file.size > 0 temp_file.key = 'Restored/' + Time.new.strftime('%Y-%m-%d/') + temp_file.key @logger.info('Recoving from crash and uploading', :file => temp_file.key) @crash_uploader.upload_async( temp_file, :on_complete => method(:clean_temporary_file), :upload_options => upload_options ) elsif temp_file.size == 0 @logger.info('Recoving from crash, delete empty files', :file => temp_file.path) temp_file.delete! end end end
rotate_if_needed(prefixs)
click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 164 def rotate_if_needed(prefixs) prefixs.each do |prefix| @file_repository.get_factory(prefix) do |factory| tmp_file = factory.current if @rotation.rotate?(tmp_file) @logger.debug('Rotate file', :strategy => tmp_file.key, :path => tmp_file.path) upload_file(tmp_file) factory.rotate! end end end end
start_periodic_check()
click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 229 def start_periodic_check @logger.info('Start periodic rotation check') @periodic_check = Concurrent::TimerTask.new( :execution_interval => PERIODIC_CHECK_INTERVAL_IN_SECONDS ) do @logger.debug('Periodic check for stale files') rotate_if_needed(@file_repository.keys) end @periodic_check.execute end
stop_periodic_check()
click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 243 def stop_periodic_check @periodic_check.shutdown end
upload_file(file)
click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 180 def upload_file(file) @logger.debug('Add file to uploading queue', :key => file.key) file.close @logger.debug('upload options', :upload_options => upload_options) @uploader.upload_async(file, :on_complete => method(:clean_temporary_file), :upload_options => upload_options) end
upload_options()
click to toggle source
# File lib/logstash/outputs/qingstor.rb, line 211 def upload_options options = { :content_encoding => @encoding == 'gzip' ? 'gzip' : nil } if @server_side_encryption_algorithm == 'AES256' && !@customer_key.nil? options[:server_side_encryption_algorithm] = @server_side_encryption_algorithm options[:customer_key] = @customer_key end options end