class Fluent::Plugin::TreasureDataLogOutput

Constants

IMPORT_RECORDS_LIMIT
IMPORT_SIZE_LIMIT
UPLOAD_EXT

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_tdlog.rb, line 44
def initialize
  super
  @key = nil
  @key_num_limit = 512
  @record_size_limit = 32 * 1024 * 1024
  @table_list = {}
  @empty_gz_data = TreasureData::API.create_empty_gz_data
  @user_agent = "fluent-plugin-td: #{TreasureDataPlugin::VERSION}".freeze
end

Public Instance Methods

check_table_exists(key) click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 237
def check_table_exists(key)
  unless @table_list.has_key?(key)
    database, table = key.split('.', 2)
    log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data"
    io = StringIO.new(@empty_gz_data)
    begin
      @client.import(database, table, UPLOAD_EXT, io, io.size)
      @table_list[key] = true
    rescue TreasureData::NotFoundError
      raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}' to create it."
    rescue => e
      log.warn "failed to check existence of '#{database}.#{table}' table on Treasure Data", :error => e.inspect
      log.debug_backtrace e.backtrace
    end
  end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_tdlog.rb, line 54
def configure(conf)
  compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag')

  super

  if @use_gzip_command
    require 'open3'

    begin
      Open3.capture3("gzip -V")
    rescue Errno::ENOENT
      raise ConfigError, "'gzip' utility must be in PATH for use_gzip_command parameter"
    end
  end

  FileUtils.mkdir_p(@tmpdir) if @tmpdir

  if @database && @table
    validate_database_and_table_name(@database, @table)
    @key = "#{@database}.#{@table}"
  else
    unless @chunk_key_tag
      raise Fluent::ConfigError, "'tag' must be included in <buffer ARG> when database and table are not specified"
    end
  end
end
ensure_database_and_table(database, table) click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 267
def ensure_database_and_table(database, table)
  log.info "Creating table #{database}.#{table} on TreasureData"
  begin
    @api_client.create_log_table(database, table)
  rescue TreasureData::NotFoundError
    @api_client.create_database(database)
    @api_client.create_log_table(database, table)
  rescue TreasureData::AlreadyExistsError
    # ignored
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 107
def format(tag, time, record)
  begin
    record['time'] = time.to_i
    record.delete(:time) if record.has_key?(:time)

    if record.size > @key_num_limit
      router.emit_error_event(tag, time, record, RuntimeError.new("too many number of keys (#{record.size} keys)"))
      return nil
    end
  rescue => e
    router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("skipped a broken record: #{e}"))
    return nil
  end

  begin
    result = record.to_msgpack
  rescue RangeError
    result = TreasureData::API.normalized_msgpack(record)
  rescue => e
    router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("can't convert record to msgpack: #{e}"))
    return nil
  end

  if result.bytesize > @record_size_limit
    # Don't raise error. Large size is not critical for streaming import
    log.warn "Size of a record too large (#{result.bytesize} bytes): #{summarize_record(record)}"
  end

  result
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 103
def formatted_to_msgpack_binary
  true
end
gzip_by_command(chunk, tmp) click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 173
def gzip_by_command(chunk, tmp)
  chunk_is_file = @buffer_config['@type'] == 'file'
  path = if chunk_is_file
           chunk.path
         else
           w = Tempfile.new("gzip-tdlog-#{chunk.metadata.tag}-", @tmpdir)
           w.binmode
           chunk.write_to(w)
           w.close
           w.path
         end
  res = system "gzip -c #{path} > #{tmp.path}"
  unless res
    log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}"
    tmp.truncate(0)
    return gzip_by_writer(chunk, tmp)
  end
  File.size(tmp.path)
ensure
  unless chunk_is_file
    w.close(true) rescue nil
  end
end
gzip_by_writer(chunk, tmp) click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 197
def gzip_by_writer(chunk, tmp)
  w = Zlib::GzipWriter.new(tmp)
  chunk.write_to(w)
  w.finish
  w = nil
  tmp.pos
ensure
  if w
    w.close rescue nil
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 99
def multi_workers_ready?
  true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_tdlog.rb, line 81
def start
  super

  client_opts = {
    :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent,
    :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout
  }
  @client = TreasureData::Client.new(@apikey, client_opts.merge({:endpoint => @import_endpoint}))
  @api_client = TreasureData::Client.new(@apikey, client_opts.merge({:endpoint => @api_endpoint}))
  if @key
    if @auto_create_table
      ensure_database_and_table(@database, @table)
    else
      check_table_exists(@key)
    end
  end
end
summarize_record(record) click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 138
def summarize_record(record)
  json = Yajl.dump(record)
  if json.size > 100
    json[0..97] + "..."
  else
    json
  end
end
upload(database, table, io, size, unique_id) click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 209
def upload(database, table, io, size, unique_id)
  unique_str = unique_id.unpack('C*').map { |x| "%02x" % x }.join
  log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" }

  start = Time.now
  begin
    begin
      @client.import(database, table, UPLOAD_EXT, io, size, unique_str)
    rescue TreasureData::NotFoundError
      unless @auto_create_table
        raise
      end
      ensure_database_and_table(database, table)
      io.pos = 0
      retry
    end
  rescue TreasureData::TooManyRequestsError
    raise
  rescue TreasureData::ClientError => e
    raise Fluent::UnrecoverableError.new(e.message)
  rescue => e
    elapsed = Time.now - start
    ne = RuntimeError.new("Failed to upload to Treasure Data '#{database}.#{table}' table: #{e.inspect} (#{size} bytes; #{elapsed} seconds)")
    ne.set_backtrace(e.backtrace)
    raise ne
  end
end
validate_database_and_table_name(database, table) click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 254
def validate_database_and_table_name(database, table)
  begin
    TreasureData::API.validate_database_name(database)
  rescue => e
    raise ConfigError, "Invalid database name #{database.inspect}: #{e}"
  end
  begin
    TreasureData::API.validate_table_name(table)
  rescue => e
    raise ConfigError, "Invalid table name #{table.inspect}: #{e}"
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_tdlog.rb, line 147
def write(chunk)
  unique_id = chunk.unique_id

  if @key
    database, table = @database, @table
  else
    database, table = chunk.metadata.tag.split('.')[-2, 2]
    database = TreasureData::API.normalize_database_name(database)
    table = TreasureData::API.normalize_table_name(table)
  end

  FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil?
  f = Tempfile.new("tdlog-#{chunk.metadata.tag}-", @tmpdir)
  f.binmode

  size = if @use_gzip_command
           gzip_by_command(chunk, f)
         else
           gzip_by_writer(chunk, f)
         end
  f.pos = 0
  upload(database, table, f, size, unique_id)
ensure
  f.close(true) if f
end