class Embulk::Output::Influxdb08

Public Class Methods

replaced_series() click to toggle source

def self.resume(task, schema, count, &control)

task_reports = yield(task)

next_config_diff = {}
return next_config_diff

end

# File lib/embulk/output/influxdb08.rb, line 55
def self.replaced_series
  @replaced_series ||= {}
end
transaction(config, schema, count) { |task| ... } click to toggle source
# File lib/embulk/output/influxdb08.rb, line 10
def self.transaction(config, schema, count, &control)
  # configuration code:
  task = {
    "host" => config.param("host", :string, default: nil),
    "hosts" => config.param("hosts", :array, default: nil),
    "port" => config.param("port", :integer, default: 8086),
    "username" => config.param("username", :string, default: "root"),
    "password" => config.param("password", :string, default: "root"),
    "database" => config.param("database", :string),
    "series" => config.param("series", :string, default: nil),
    "series_per_column" => config.param("series_per_column", :bool, default: false),
    "timestamp_column" => config.param("timestamp_column", :string, default: nil),
    "ignore_columns" => config.param("ignore_columns", :array, default: []),
    "default_timezone" => config.param("default_timezone", :string, default: "UTC"),
    "mode" => config.param("mode", :string, default: "insert"),
    "use_ssl" => config.param("use_ssl", :bool, default: false),
    "verify_ssl" => config.param("verify_ssl", :bool, default: true),
    "ssl_ca_cert" => config.param("ssl_ca_cert", :string, default: nil),
    "time_precision" => config.param("time_precision", :string, default: "s"),
    "initial_delay" => config.param("initial_delay", :float, default: 0.01),
    "max_delay" => config.param("max_delay", :float, default: 30),
    "open_timeout" => config.param("open_timeout", :integer, default: 5),
    "read_timeout" => config.param("read_timeout", :integer, default: 300),
    "async" => config.param("async", :bool, default: false),
    "udp" => config.param("udp", :bool, default: false),
    "retry" => config.param("retry", :integer, default: nil),
    "denormalize" => config.param("denormalize", :bool, default: true),
  }

  # resumable output:
  # resume(task, schema, count, &control)

  # non-resumable output:
  task_reports = yield(task)
  next_config_diff = {}
  return next_config_diff
end

Public Instance Methods

abort() click to toggle source
# File lib/embulk/output/influxdb08.rb, line 97
def abort
end
add(page) click to toggle source
# File lib/embulk/output/influxdb08.rb, line 85
def add(page)
  data = @series ? build_payload(page) : build_payload_per_column(page)

  Embulk.logger.info { "embulk-output-influxdb08: Writing to #{@database}" }
  Embulk.logger.debug { "embulk-output-influxdb08: #{data}" }

  @connection.write_points(data, @async, @time_precision)
end
close() click to toggle source
# File lib/embulk/output/influxdb08.rb, line 82
def close
end
commit() click to toggle source
# File lib/embulk/output/influxdb08.rb, line 100
def commit
  task_report = {}
  return task_report
end
finish() click to toggle source
# File lib/embulk/output/influxdb08.rb, line 94
def finish
end
init() click to toggle source
# File lib/embulk/output/influxdb08.rb, line 59
def init
  # initialization code:
  task["hosts"] ||= Array(task["host"] || "localhost")
  @database = task["database"]
  @series = task["series"]
  @series_per_column = task["series_per_column"]
  unless @series
    raise "Need series or series_per_column parameter" unless @series_per_column
  end
  if task["timestamp_column"]
    @timestamp_column = schema.find { |col| col.name == task["timestamp_column"] }
  end
  @ignore_columns = task["ignore_columns"]
  @time_precision = task["time_precision"]
  @replace = task["mode"].downcase == "replace"
  @default_timezone = task["default_timezone"]

  @connection = InfluxDB::Client.new(@database,
    task.map { |k, v| [k.to_sym, v] }.to_h
  )
  create_database_if_not_exist
end

Private Instance Methods

build_payload(page) click to toggle source
# File lib/embulk/output/influxdb08.rb, line 107
def build_payload(page)
  data = page.map do |record|
    series = resolve_placeholder(record, @series)
    delete_series_if_exist(series)
    payload = {
      name: series,
      data: Hash[
        target_columns.map { |col| [col.name, convert_timezone(record[col.index])] }
      ],
    }
    payload[:data][:time] = convert_timezone(record[@timestamp_column.index]).to_i if @timestamp_column
    payload
  end
end
build_payload_per_column(page) click to toggle source
# File lib/embulk/output/influxdb08.rb, line 122
def build_payload_per_column(page)
  page.flat_map do |record|
    target_columns.map do |col|
      series = col.name
      delete_series_if_exist(series)
      payload = {
        name: series,
        data: {value: record[col.index]},
      }
      payload[:data][:time] = convert_timezone(record[@timestamp_column.index]).to_i if @timestamp_column
      payload
    end
  end
end
convert_timezone(value) click to toggle source
# File lib/embulk/output/influxdb08.rb, line 170
def convert_timezone(value)
  return value unless value.is_a?(Time)

  timezone = Timezone::Zone.new(zone: @default_timezone)
  timezone.time(value)
end
create_database_if_not_exist() click to toggle source
# File lib/embulk/output/influxdb08.rb, line 151
def create_database_if_not_exist
  unless @connection.get_database_list.any? { |db| db["name"] == @database }
    @connection.create_database(@database)
  end
end
delete_series_if_exist(series) click to toggle source
# File lib/embulk/output/influxdb08.rb, line 137
def delete_series_if_exist(series)
  if @replace && self.class.replaced_series[series].nil? && find_series(series)
    Embulk.logger.info { "embulk-output-influxdb08: Delete series #{series} from #{@database}" }
    self.class.replaced_series[series] = true
    @connection.delete_series(series)
  end
end
find_series(series) click to toggle source
# File lib/embulk/output/influxdb08.rb, line 145
def find_series(series)
  @connection.query('LIST SERIES')["list_series_result"].find { |v|
    v["name"] == series
  }
end
resolve_placeholder(record, series) click to toggle source
# File lib/embulk/output/influxdb08.rb, line 157
def resolve_placeholder(record, series)
  series.gsub(/\$\{(.*?)\}/) do |name|
    index = schema.index { |col| col.name == $1 }
    record[index]
  end
end
target_columns() click to toggle source
# File lib/embulk/output/influxdb08.rb, line 164
def target_columns
  schema.reject do |col|
    col.name == @timestamp_column.name || @ignore_columns.include?(col.name)
  end
end