class Fluent::GoogleCloudStorageOut

Constants

SUPPORTED_COMPRESS
ServiceAccountCredentials
Storage
UNIQUE_PLACE_HOLDER
UNIQUE_STRATEGY

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 71
def initialize
  super
  require 'zlib'
  require 'net/http'
  require 'time'
  require 'mime-types'
end

Public Instance Methods

chunk_unique_id_to_str(unique_id) click to toggle source
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 134
def chunk_unique_id_to_str(unique_id)
  unique_id.unpack('C*').map{|x| x.to_s(16).rjust(2,'0')}.join('')
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 84
def configure(conf)
  if conf['path']
    if conf['path'].index('%S')
      conf['time_slice_format'] = '%Y%m%d%H%M%S'
    elsif conf['path'].index('%M')
      conf['time_slice_format'] = '%Y%m%d%H%M'
    elsif conf['path'].index('%H')
      conf['time_slice_format'] = '%Y%m%d%H'
    end
  end

  super

  if @path.index(UNIQUE_PLACE_HOLDER).nil? && @unique_strategy
    raise Fluent::ConfigError, "Path must contain ${unique}, or you set the unique_strategy to nil."
  end

  @formatter = Plugin.new_formatter(@format)
  @formatter.configure(conf)
  @path_suffix = ".log"
  prepare_client()

  if @unique_strategy == :increment
     @samepath_counter = 0
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 130
def format(tag, time, record)
  @formatter.format(tag, time, record)
end
path_format(chunk) click to toggle source
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 159
def path_format(chunk)
  # format from chunk key
  path = Time.strptime(chunk.key, @time_slice_format).strftime(@path)

  # format for make unique
  path = path_format_with_unique_strategy(path, @unique_strategy, chunk.key, chunk.unique_id)

  # append .log
  unless path.include?(".log")
    path.concat(@path_suffix)
  end

  # append .gz
  case @compress
  when nil
    path
  when :gz
    "#{path}.gz"
  end
end
path_format_with_unique_strategy(path, strategy, chunk_key, chunk_unique) click to toggle source
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 138
def path_format_with_unique_strategy(path, strategy, chunk_key, chunk_unique)
   case strategy
   when nil
     path
   when :chunk_id
     path.gsub(UNIQUE_PLACE_HOLDER, chunk_unique_id_to_str(chunk_unique))
   when :increment
     if @before_chunk_key
       if @before_chunk_key == chunk_key
         @samepath_counter += 1
       else
         @samepath_counter = 0
       end
     end
     @before_chunk_key = chunk_key
     path.gsub(UNIQUE_PLACE_HOLDER, "#{@samepath_counter}")
   when :timestamp
     path.gsub(UNIQUE_PLACE_HOLDER, Time.now.strftime(@unique_format))
   end
end
prepare_client() click to toggle source
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 111
def prepare_client
  @storage = Storage::StorageService.new
  scopes = [Storage::AUTH_CLOUD_PLATFORM, Storage::AUTH_DEVSTORAGE_FULL_CONTROL]
  @storage.authorization = ServiceAccountCredentials.make_creds(
    {
      :json_key_io => File.open(@service_account_json_key_path),
      :scope => scopes
    }
  )
end
send(path, data) click to toggle source
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 180
def send(path, data)
  mimetype = MIME::Types.type_for(path).first
  io = nil
  if @compress
    io = StringIO.new("")
    writer = Zlib::GzipWriter.new(io)
    writer.write(data)
    writer.finish
    io.rewind
  else
    io = StringIO.new(data)
  end

  @storage.insert_object(@bucket_id, {name: path}, upload_source: io, content_type:mimetype.content_type)
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 126
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 122
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 196
def write(chunk)
  gcs_path = path_format(chunk)
  send(gcs_path, chunk.read)
  gcs_path
end