class S3Downloader
Public Class Methods
new(logger, stop_semaphore, options)
click to toggle source
# File lib/logstash/inputs/s3/downloader.rb, line 7 def initialize(logger, stop_semaphore, options) @logger = logger @stopped = stop_semaphore @factory = options[:s3_client_factory] @delete_on_success = options[:delete_on_success] @include_object_properties = options[:include_object_properties] end
Public Instance Methods
cleanup_local_object(record)
click to toggle source
# File lib/logstash/inputs/s3/downloader.rb, line 36 def cleanup_local_object(record) FileUtils.remove_entry_secure(record[:local_file], true) if ::File.exists?(record[:local_file]) rescue Exception => e @logger.warn("Could not delete file", :file => record[:local_file], :error => e) end
cleanup_s3object(record)
click to toggle source
# File lib/logstash/inputs/s3/downloader.rb, line 42 def cleanup_s3object(record) return unless @delete_on_success begin @factory.get_s3_client(record[:bucket]) do |s3| s3.delete_object(bucket: record[:bucket], key: record[:key]) end rescue Exception => e @logger.warn("Failed to delete s3 object", :record => record, :error => e) end end
copy_s3object_to_disk(record)
click to toggle source
# File lib/logstash/inputs/s3/downloader.rb, line 15 def copy_s3object_to_disk(record) # (from docs) WARNING: # yielding data to a block disables retries of networking errors! begin @factory.get_s3_client(record[:bucket]) do |s3| response = s3.get_object( bucket: record[:bucket], key: record[:key], response_target: record[:local_file] ) record[:s3_data] = response.to_h.keep_if { |key| @include_object_properties.include?(key) } end rescue Aws::S3::Errors::ServiceError => e @logger.error("Unable to download file. Requeuing the message", :error => e, :record => record) # prevent sqs message deletion throw :skip_delete end throw :skip_delete if stop? return true end
stop?()
click to toggle source
# File lib/logstash/inputs/s3/downloader.rb, line 53 def stop? @stopped.value end