class Embulk::Output::Influxdb

Public Class Methods

replaced_measurements() click to toggle source

def self.resume(task, schema, count, &control)

task_reports = yield(task)

next_config_diff = {}
return next_config_diff

end

# File lib/embulk/output/influxdb.rb, line 56
def self.replaced_measurements
  @replaced_measurements ||= {}
end
transaction(config, schema, count) { |task| ... } click to toggle source
# File lib/embulk/output/influxdb.rb, line 10
def self.transaction(config, schema, count, &control)
  # configuration code:
  task = {
    "host" => config.param("host", :string, default: nil),
    "hosts" => config.param("hosts", :array, default: nil),
    "port" => config.param("port", :integer, default: 8086),
    "username" => config.param("username", :string, default: "root"),
    "password" => config.param("password", :string, default: "root"),
    "database" => config.param("database", :string),
    "series" => config.param("series", :string, default: nil),
    "series_per_column" => config.param("series_per_column", :bool, default: false),
    "timestamp_column" => config.param("timestamp_column", :string, default: nil),
    "ignore_columns" => config.param("ignore_columns", :array, default: []),
    "tag_columns" => config.param("tag_columns", :array, default: []),
    "default_timezone" => config.param("default_timezone", :string, default: "UTC"),
    "mode" => config.param("mode", :string, default: "insert"),
    "use_ssl" => config.param("use_ssl", :bool, default: false),
    "verify" => config.param("verify_ssl", :bool, default: true),
    "ssl_ca_cert" => config.param("ssl_ca_cert", :string, default: nil),
    "time_precision" => config.param("time_precision", :string, default: "s"),
    "initial_delay" => config.param("initial_delay", :float, default: 0.01),
    "max_delay" => config.param("max_delay", :float, default: 30),
    "open_timeout" => config.param("open_timeout", :integer, default: 5),
    "read_timeout" => config.param("read_timeout", :integer, default: 300),
    "async" => config.param("async", :bool, default: false),
    "udp" => config.param("udp", :bool, default: false),
    "retry" => config.param("retry", :integer, default: nil),
    "denormalize" => config.param("denormalize", :bool, default: true),
  }

  # resumable output:
  # resume(task, schema, count, &control)

  # non-resumable output:
  task_reports = yield(task)
  next_config_diff = {}
  return next_config_diff
end

Public Instance Methods

abort() click to toggle source
# File lib/embulk/output/influxdb.rb, line 100
def abort
end
add(page) click to toggle source
# File lib/embulk/output/influxdb.rb, line 88
def add(page)
  data = @series ? build_payload(page) : build_payload_per_column(page)

  Embulk.logger.info { "embulk-output-influxdb: Writing to #{@database}" }
  Embulk.logger.debug { "embulk-output-influxdb: #{data}" }

  @connection.write_points(data, @time_precision)
end
close() click to toggle source
# File lib/embulk/output/influxdb.rb, line 85
def close
end
commit() click to toggle source
# File lib/embulk/output/influxdb.rb, line 103
def commit
  task_report = {}
  return task_report
end
finish() click to toggle source
# File lib/embulk/output/influxdb.rb, line 97
def finish
end
init() click to toggle source
# File lib/embulk/output/influxdb.rb, line 60
def init
  # initialization code:
  task["hosts"] ||= Array(task["host"] || "localhost")
  @database = task["database"]
  @series = task["series"]
  @series_per_column = task["series_per_column"]
  @tag_columns = task["tag_columns"]
  unless @series
    raise "Need series or series_per_column parameter" unless @series_per_column
    raise "Need series parameter when you specify tag_columns" unless @tag_columns.empty?
  end
  if task["timestamp_column"]
    @timestamp_column = schema.find { |col| col.name == task["timestamp_column"] }
  end
  @ignore_columns = task["ignore_columns"]
  @time_precision = task["time_precision"]
  @replace = task["mode"].downcase == "replace"
  @default_timezone = task["default_timezone"]

  @connection = InfluxDB::Client.new(@database,
    task.map { |k, v| [k.to_sym, v] }.to_h
  )
  create_database_if_not_exist
end

Private Instance Methods

build_payload(page) click to toggle source
# File lib/embulk/output/influxdb.rb, line 110
def build_payload(page)
  data = page.map do |record|
    series = resolve_placeholder(record, @series)
    drop_measurement_if_exist(series)
    payload = {
      series: series,
      values: Hash[
        target_value_columns.map { |col| [col.name, convert_timezone(record[col.index])] }
      ],
      tags: Hash[
        target_tag_columns.map { |col| [col.name, convert_timezone(record[col.index])] }
      ],
    }
    payload[:timestamp] = unixtime(convert_timezone(record[@timestamp_column.index])) if @timestamp_column
    payload
  end
end
build_payload_per_column(page) click to toggle source
# File lib/embulk/output/influxdb.rb, line 128
def build_payload_per_column(page)
  page.flat_map do |record|
    target_columns.map do |col|
      series = col.name
      drop_measurement_if_exist(series)
      payload = {
        series: series,
        values: {value: record[col.index]},
      }
      payload[:timestamp] = unixtime(convert_timezone(record[@timestamp_column.index])) if @timestamp_column
      payload
    end
  end
end
convert_timezone(value) click to toggle source
# File lib/embulk/output/influxdb.rb, line 189
def convert_timezone(value)
  return value unless value.is_a?(Time)

  timezone = Timezone::Zone.new(zone: @default_timezone)
  timezone.time(value)
end
create_database_if_not_exist() click to toggle source
# File lib/embulk/output/influxdb.rb, line 158
def create_database_if_not_exist
  unless @connection.list_databases.any? { |db| db["name"] == @database }
    @connection.create_database(@database)
  end
end
drop_measurement_if_exist(series) click to toggle source
# File lib/embulk/output/influxdb.rb, line 143
def drop_measurement_if_exist(series)
  if @replace && self.class.replaced_measurements[series].nil? && find_measurement(series)
    Embulk.logger.info { "embulk-output-influxdb: Drop measurement #{series} from #{@database}" }
    self.class.replaced_measurements[series] = true
    @connection.query("DROP MEASUREMENT #{series}")
  end
end
find_measurement(series) click to toggle source
# File lib/embulk/output/influxdb.rb, line 151
def find_measurement(series)
  result = @connection.query("SHOW MEASUREMENTS")[0]
  if result
    result["values"].find { |v| v["name"] == series }
  end
end
resolve_placeholder(record, series) click to toggle source
# File lib/embulk/output/influxdb.rb, line 164
def resolve_placeholder(record, series)
  series.gsub(/\$\{(.*?)\}/) do |name|
    index = schema.index { |col| col.name == $1 }
    record[index]
  end
end
target_columns() click to toggle source
# File lib/embulk/output/influxdb.rb, line 171
def target_columns
  schema.reject do |col|
    col.name == @timestamp_column.name || @ignore_columns.include?(col.name)
  end
end
target_tag_columns() click to toggle source
# File lib/embulk/output/influxdb.rb, line 183
def target_tag_columns
  target_columns.select do |col|
    @tag_columns.include?(col.name)
  end
end
target_value_columns() click to toggle source
# File lib/embulk/output/influxdb.rb, line 177
def target_value_columns
  target_columns.reject do |col|
    @tag_columns.include?(col.name)
  end
end
unixtime(time) click to toggle source
# File lib/embulk/output/influxdb.rb, line 196
def unixtime(time)
  if @time_precision == 'u'
    format("%d%06d", time.to_i, time.usec).to_i
  else
    time.to_i
  end
end