class LogStash::Outputs::LogstashAzureBlobOutput::Uploader

a sub class of LogstashAzureBlobOutput this class uploads the files to Azure cloud

Constants

DEFAULT_THREADPOOL
TIME_BEFORE_RETRYING_SECONDS

Attributes

blob_account[RW]
container_name[RW]
logger[RW]

Public Class Methods

new(blob_account, container_name, logger, threadpool = DEFAULT_THREADPOOL) click to toggle source

Initializes the class @param blob_account [Object] endpoint to azure gem @param container_name [String] name of the container in azure blob, at this point, if it doesn't exist, it was already created

# File lib/logstash/outputs/blob/uploader.rb, line 21
def initialize(blob_account, container_name, logger, threadpool = DEFAULT_THREADPOOL)
  @blob_account = blob_account
  @workers_pool = threadpool
  @logger = logger
  @container_name = container_name
end

Public Instance Methods

stop() click to toggle source

stop threads

# File lib/logstash/outputs/blob/uploader.rb, line 67
def stop
  @workers_pool.shutdown
  @workers_pool.wait_for_termination(nil) # block until its done
end
upload(file, options = {}) click to toggle source

Uploads the file to the container

# File lib/logstash/outputs/blob/uploader.rb, line 37
def upload(file, options = {})

  begin
    content = Object::File.open(file.path, 'rb').read
    filename = Object::File.basename file.path
    puts filename
    blob = blob_account.create_block_blob(container_name, filename, content)
    puts blob.name
  rescue => e
    # When we get here it usually mean that LogstashAzureBlobOutput tried to do some retry by himself (default is 3)
    # When the retry limit is reached or another error happen we will wait and retry.
    #
    # Thread might be stuck here, but I think its better than losing anything
    # its either a transient errors or something bad really happened.
    logger.error('Uploading failed, retrying', exception: e.class, message: e.message, path: file.path, backtrace: e.backtrace)
    retry
  end

  options[:on_complete].call(file) unless options[:on_complete].nil?
  blob
rescue => e
  logger.error('An error occured in the `on_complete` uploader',
               exception: e.class,
               message: e.message,
               path: file.path,
               backtrace: e.backtrace)
  raise e # reraise it since we don't deal with it now
end
upload_async(file, options = {}) click to toggle source

Create threads to upload the file to the container

# File lib/logstash/outputs/blob/uploader.rb, line 29
def upload_async(file, options = {})
  @workers_pool.post do
    LogStash::Util.set_thread_name("LogstashAzureBlobOutput output uploader, file: #{file.path}")
    upload(file, options)
  end
end