class Fluent::Plugin::TreasureDataLogOutput
Constants
- IMPORT_RECORDS_LIMIT
- IMPORT_SIZE_LIMIT
- UPLOAD_EXT
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_tdlog.rb, line 44 def initialize super @key = nil @key_num_limit = 512 @record_size_limit = 32 * 1024 * 1024 @table_list = {} @empty_gz_data = TreasureData::API.create_empty_gz_data @user_agent = "fluent-plugin-td: #{TreasureDataPlugin::VERSION}".freeze end
Public Instance Methods
check_table_exists(key)
click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 237 def check_table_exists(key) unless @table_list.has_key?(key) database, table = key.split('.', 2) log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data" io = StringIO.new(@empty_gz_data) begin @client.import(database, table, UPLOAD_EXT, io, io.size) @table_list[key] = true rescue TreasureData::NotFoundError raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}' to create it." rescue => e log.warn "failed to check existence of '#{database}.#{table}' table on Treasure Data", :error => e.inspect log.debug_backtrace e.backtrace end end end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_tdlog.rb, line 54 def configure(conf) compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag') super if @use_gzip_command require 'open3' begin Open3.capture3("gzip -V") rescue Errno::ENOENT raise ConfigError, "'gzip' utility must be in PATH for use_gzip_command parameter" end end FileUtils.mkdir_p(@tmpdir) if @tmpdir if @database && @table validate_database_and_table_name(@database, @table) @key = "#{@database}.#{@table}" else unless @chunk_key_tag raise Fluent::ConfigError, "'tag' must be included in <buffer ARG> when database and table are not specified" end end end
ensure_database_and_table(database, table)
click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 267 def ensure_database_and_table(database, table) log.info "Creating table #{database}.#{table} on TreasureData" begin @api_client.create_log_table(database, table) rescue TreasureData::NotFoundError @api_client.create_database(database) @api_client.create_log_table(database, table) rescue TreasureData::AlreadyExistsError # ignored end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 107 def format(tag, time, record) begin record['time'] = time.to_i record.delete(:time) if record.has_key?(:time) if record.size > @key_num_limit router.emit_error_event(tag, time, record, RuntimeError.new("too many number of keys (#{record.size} keys)")) return nil end rescue => e router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("skipped a broken record: #{e}")) return nil end begin result = record.to_msgpack rescue RangeError result = TreasureData::API.normalized_msgpack(record) rescue => e router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("can't convert record to msgpack: #{e}")) return nil end if result.bytesize > @record_size_limit # Don't raise error. Large size is not critical for streaming import log.warn "Size of a record too large (#{result.bytesize} bytes): #{summarize_record(record)}" end result end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 103 def formatted_to_msgpack_binary true end
gzip_by_command(chunk, tmp)
click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 173 def gzip_by_command(chunk, tmp) chunk_is_file = @buffer_config['@type'] == 'file' path = if chunk_is_file chunk.path else w = Tempfile.new("gzip-tdlog-#{chunk.metadata.tag}-", @tmpdir) w.binmode chunk.write_to(w) w.close w.path end res = system "gzip -c #{path} > #{tmp.path}" unless res log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}" tmp.truncate(0) return gzip_by_writer(chunk, tmp) end File.size(tmp.path) ensure unless chunk_is_file w.close(true) rescue nil end end
gzip_by_writer(chunk, tmp)
click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 197 def gzip_by_writer(chunk, tmp) w = Zlib::GzipWriter.new(tmp) chunk.write_to(w) w.finish w = nil tmp.pos ensure if w w.close rescue nil end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 99 def multi_workers_ready? true end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_tdlog.rb, line 81 def start super client_opts = { :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent, :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout } @client = TreasureData::Client.new(@apikey, client_opts.merge({:endpoint => @import_endpoint})) @api_client = TreasureData::Client.new(@apikey, client_opts.merge({:endpoint => @api_endpoint})) if @key if @auto_create_table ensure_database_and_table(@database, @table) else check_table_exists(@key) end end end
summarize_record(record)
click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 138 def summarize_record(record) json = Yajl.dump(record) if json.size > 100 json[0..97] + "..." else json end end
upload(database, table, io, size, unique_id)
click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 209 def upload(database, table, io, size, unique_id) unique_str = unique_id.unpack('C*').map { |x| "%02x" % x }.join log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" } start = Time.now begin begin @client.import(database, table, UPLOAD_EXT, io, size, unique_str) rescue TreasureData::NotFoundError unless @auto_create_table raise end ensure_database_and_table(database, table) io.pos = 0 retry end rescue TreasureData::TooManyRequestsError raise rescue TreasureData::ClientError => e raise Fluent::UnrecoverableError.new(e.message) rescue => e elapsed = Time.now - start ne = RuntimeError.new("Failed to upload to Treasure Data '#{database}.#{table}' table: #{e.inspect} (#{size} bytes; #{elapsed} seconds)") ne.set_backtrace(e.backtrace) raise ne end end
validate_database_and_table_name(database, table)
click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 254 def validate_database_and_table_name(database, table) begin TreasureData::API.validate_database_name(database) rescue => e raise ConfigError, "Invalid database name #{database.inspect}: #{e}" end begin TreasureData::API.validate_table_name(table) rescue => e raise ConfigError, "Invalid table name #{table.inspect}: #{e}" end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 147 def write(chunk) unique_id = chunk.unique_id if @key database, table = @database, @table else database, table = chunk.metadata.tag.split('.')[-2, 2] database = TreasureData::API.normalize_database_name(database) table = TreasureData::API.normalize_table_name(table) end FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil? f = Tempfile.new("tdlog-#{chunk.metadata.tag}-", @tmpdir) f.binmode size = if @use_gzip_command gzip_by_command(chunk, f) else gzip_by_writer(chunk, f) end f.pos = 0 upload(database, table, f, size, unique_id) ensure f.close(true) if f end