class S3Downloader

Public Class Methods

new(logger, stop_semaphore, options) click to toggle source
# File lib/logstash/inputs/s3/downloader.rb, line 7
def initialize(logger, stop_semaphore, options)
  @logger = logger
  @stopped = stop_semaphore
  @factory = options[:s3_client_factory]
  @delete_on_success = options[:delete_on_success]
  @include_object_properties = options[:include_object_properties]
end

Public Instance Methods

cleanup_local_object(record) click to toggle source
# File lib/logstash/inputs/s3/downloader.rb, line 36
def cleanup_local_object(record)
  FileUtils.remove_entry_secure(record[:local_file], true) if ::File.exists?(record[:local_file])
rescue Exception => e
  @logger.warn("Could not delete file", :file => record[:local_file], :error => e)
end
cleanup_s3object(record) click to toggle source
# File lib/logstash/inputs/s3/downloader.rb, line 42
def cleanup_s3object(record)
  return unless @delete_on_success
  begin
    @factory.get_s3_client(record[:bucket]) do |s3|
      s3.delete_object(bucket: record[:bucket], key: record[:key])
    end
  rescue Exception => e
    @logger.warn("Failed to delete s3 object", :record => record, :error => e)
  end
end
copy_s3object_to_disk(record) click to toggle source
# File lib/logstash/inputs/s3/downloader.rb, line 15
def copy_s3object_to_disk(record)
  # (from docs) WARNING:
  # yielding data to a block disables retries of networking errors!
  begin
    @factory.get_s3_client(record[:bucket]) do |s3|
      response = s3.get_object(
        bucket: record[:bucket],
        key: record[:key],
        response_target: record[:local_file]
      )
      record[:s3_data] = response.to_h.keep_if { |key| @include_object_properties.include?(key) }
    end
  rescue Aws::S3::Errors::ServiceError => e
    @logger.error("Unable to download file. Requeuing the message", :error => e, :record => record)
    # prevent sqs message deletion
    throw :skip_delete
  end
  throw :skip_delete if stop?
  return true
end
stop?() click to toggle source
# File lib/logstash/inputs/s3/downloader.rb, line 53
def stop?
  @stopped.value
end