class Fluent::GoogleCloudStorageOut
Constants
- SUPPORTED_COMPRESS
- ServiceAccountCredentials
- Storage
- UNIQUE_PLACE_HOLDER
- UNIQUE_STRATEGY
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 71 def initialize super require 'zlib' require 'net/http' require 'time' require 'mime-types' end
Public Instance Methods
chunk_unique_id_to_str(unique_id)
click to toggle source
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 134 def chunk_unique_id_to_str(unique_id) unique_id.unpack('C*').map{|x| x.to_s(16).rjust(2,'0')}.join('') end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 84 def configure(conf) if conf['path'] if conf['path'].index('%S') conf['time_slice_format'] = '%Y%m%d%H%M%S' elsif conf['path'].index('%M') conf['time_slice_format'] = '%Y%m%d%H%M' elsif conf['path'].index('%H') conf['time_slice_format'] = '%Y%m%d%H' end end super if @path.index(UNIQUE_PLACE_HOLDER).nil? && @unique_strategy raise Fluent::ConfigError, "Path must contain ${unique}, or you set the unique_strategy to nil." end @formatter = Plugin.new_formatter(@format) @formatter.configure(conf) @path_suffix = ".log" prepare_client() if @unique_strategy == :increment @samepath_counter = 0 end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 130 def format(tag, time, record) @formatter.format(tag, time, record) end
path_format(chunk)
click to toggle source
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 159 def path_format(chunk) # format from chunk key path = Time.strptime(chunk.key, @time_slice_format).strftime(@path) # format for make unique path = path_format_with_unique_strategy(path, @unique_strategy, chunk.key, chunk.unique_id) # append .log unless path.include?(".log") path.concat(@path_suffix) end # append .gz case @compress when nil path when :gz "#{path}.gz" end end
path_format_with_unique_strategy(path, strategy, chunk_key, chunk_unique)
click to toggle source
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 138 def path_format_with_unique_strategy(path, strategy, chunk_key, chunk_unique) case strategy when nil path when :chunk_id path.gsub(UNIQUE_PLACE_HOLDER, chunk_unique_id_to_str(chunk_unique)) when :increment if @before_chunk_key if @before_chunk_key == chunk_key @samepath_counter += 1 else @samepath_counter = 0 end end @before_chunk_key = chunk_key path.gsub(UNIQUE_PLACE_HOLDER, "#{@samepath_counter}") when :timestamp path.gsub(UNIQUE_PLACE_HOLDER, Time.now.strftime(@unique_format)) end end
prepare_client()
click to toggle source
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 111 def prepare_client @storage = Storage::StorageService.new scopes = [Storage::AUTH_CLOUD_PLATFORM, Storage::AUTH_DEVSTORAGE_FULL_CONTROL] @storage.authorization = ServiceAccountCredentials.make_creds( { :json_key_io => File.open(@service_account_json_key_path), :scope => scopes } ) end
send(path, data)
click to toggle source
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 180 def send(path, data) mimetype = MIME::Types.type_for(path).first io = nil if @compress io = StringIO.new("") writer = Zlib::GzipWriter.new(io) writer.write(data) writer.finish io.rewind else io = StringIO.new(data) end @storage.insert_object(@bucket_id, {name: path}, upload_source: io, content_type:mimetype.content_type) end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 126 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 122 def start super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_google_cloud_storage_out.rb, line 196 def write(chunk) gcs_path = path_format(chunk) send(gcs_path, chunk.read) gcs_path end