class Embulk::Plugin::Vertica::OutputVertica

Public Class Methods

connect(task) { |jv| ... } click to toggle source
# File lib/embulk/output_vertica.rb, line 53
def self.connect(task)
  jv = ::Vertica.connect({
    host: task['host'],
    port: task['port'],
    user: task['username'],
    password: task['password'],
    database: task['database'],
  })

  if block_given?
    begin
      yield jv
    ensure
      jv.close
    end
  end
  jv
end
new(task, schema, index) click to toggle source
Calls superclass method
# File lib/embulk/output_vertica.rb, line 89
def initialize(task, schema, index)
  super
  @jv = self.class.connect(task)
end
to_sql_type(type) click to toggle source
# File lib/embulk/output_vertica.rb, line 78
def self.to_sql_type(type)
  case type
    when :boolean then 'BOOLEAN'
    when :long then 'INT'
    when :double then 'FLOAT'
    when :string then 'VARCHAR'
    when :timestamp then 'TIMESTAMP'
    else fail NotSupportedSchema, "embulk-plugin-output-vertica cannot take column type #{type}"
  end
end
to_vertica_schema(schema) click to toggle source
# File lib/embulk/output_vertica.rb, line 72
def self.to_vertica_schema(schema)
    schema.names.zip(schema.types)
    .map { |name, type| "#{name} #{to_sql_type(type)}" }
    .join(',')
end
transaction(config, schema, processor_count) { |task| ... } click to toggle source
# File lib/embulk/output_vertica.rb, line 11
def self.transaction(config, schema, processor_count, &control)
  task = {
    'host' => config.param('host', :string, :default => 'localhost'),
    'port' => config.param('port', :integer, :default => 5433),
    'username' => config.param('username', :string),
    'password' => config.param('password', :string, :default => ''),
    'database' => config.param('database', :string, :default => 'vdb'),
    'schema' => config.param('schema', :string, :default => 'public'),
    'table' => config.param('table', :string),
  }

  now = Time.now
  unique_name = "%08x%08x" % [now.tv_sec, now.tv_nsec]
  task['temp_table'] = "#{task['table']}_LOAD_TEMP_#{unique_name}"

  sql_schema = self.to_vertica_schema schema

  connect(task) do |jv|
    # drop table if exists "DEST"
    # 'create table if exists "TEMP" ("COL" json)'
    jv.query %[drop table if exists #{task['schema']}.#{task['temp_table']}]
    jv.query %[create table #{task['schema']}.#{task['temp_table']} (#{sql_schema})]
  end

  begin
    yield(task)
    connect(task) do |jv|
      # create table if not exists "DEST" ("COL" json)
      # 'insert into "DEST" ("COL") select "COL" from "TEMP"'
      jv.query %[create table if not exists #{task['schema']}.#{task['table']} (#{sql_schema})]
      jv.query %[insert into #{task['schema']}.#{task['table']} select * from #{task['schema']}.#{task['temp_table']}]
      jv.query %[COMMIT]
    end
  ensure
    connect(task) do |jv|
      # 'drop table if exists TEMP'
      jv.query %[drop table if exists #{task['schema']}.#{task['temp_table']}]
    end
  end
  return {}
end

Public Instance Methods

abort() click to toggle source
# File lib/embulk/output_vertica.rb, line 111
def abort
end
add(page) click to toggle source
# File lib/embulk/output_vertica.rb, line 98
def add(page)
  sql = "COPY #{@task['schema']}.#{@task['temp_table']} FROM STDIN DELIMITER ','"
  @jv.copy(sql) do |stdin|
    page.each_with_index do |record, idx|
      stdin << record.map {|v| ::Vertica.quote(v)}.join(",")
      stdin << "\n" unless record.size-1 == idx
    end
  end
end
close() click to toggle source
# File lib/embulk/output_vertica.rb, line 94
def close
  @jv.close
end
commit() click to toggle source
# File lib/embulk/output_vertica.rb, line 114
def commit
  {}
end
finish() click to toggle source
# File lib/embulk/output_vertica.rb, line 108
def finish
end