class Triglav::Agent::Vertica::Monitor

Attributes

connection[R]
periodic_last_epoch[R]
resource[R]
resource_uri_prefix[R]
singular_last_epoch[R]

Public Class Methods

new(connection, resource_uri_prefix, resource) click to toggle source

@param [Triglav::Agent::Vertica::Connection] connection @param [String] resource_uri_prefix @param [TriglavClient::ResourceResponse] resource resource:

uri: vertica://host/database/schema/table
unit: 'daily', 'hourly', 'singular', or their combinations such as 'singular,daily,hourly'
timezone: '+09:00'
span_in_days: 32

View is not supported

# File lib/triglav/agent/vertica/monitor.rb, line 23
def initialize(connection, resource_uri_prefix, resource)
  @connection = connection
  @resource_uri_prefix = resource_uri_prefix
  @resource = resource
  @status = Triglav::Agent::Status.new(resource_uri_prefix, resource.uri)
  @periodic_last_epoch = $setting.debug? ? 0 : get_from_status_file(:periodic_last_epoch)
  @singular_last_epoch = $setting.debug? ? 0 : get_from_status_file(:singular_last_epoch)
end

Public Instance Methods

get_daily_events() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 105
def get_daily_events
  sql = "select " \
    "#{q_date} AS d, 0 AS h, max(epoch) " \
    "from #{q_db}.#{q_schema}.#{q_table} " \
    "where #{q_date} IN ('#{dates.join("','")}') " \
    "#{q_where.empty? ? '' : "AND #{q_where} "}" \
    "group by d having max(epoch) > #{q_periodic_last_epoch} " \
    "order by d"
  query_and_get_events(:daily, sql)
end
get_hourly_events() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 94
def get_hourly_events
  sql = "select " \
    "#{q_date} AS d, DATE_PART('hour', #{q_timestamp}) AS h, max(epoch) " \
    "from #{q_db}.#{q_schema}.#{q_table} " \
    "where #{q_date} IN ('#{dates.join("','")}') " \
    "#{q_where.empty? ? '' : "AND #{q_where} "}" \
    "group by d, h having max(epoch) > #{q_periodic_last_epoch} " \
    "order by d, h"
  query_and_get_events(:hourly, sql)
end
get_periodic_events() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 70
def get_periodic_events
  if hourly?
    events, new_last_epoch, rows = get_hourly_events
    if daily?
      daily_events = build_daily_events_from_hourly(rows)
      events.concat(daily_events)
    end
    [events, new_last_epoch]
  elsif daily?
    get_daily_events
  else
    raise
  end
end
get_singular_events() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 85
def get_singular_events
  sql = "select " \
    "NULL AS d, NULL AS h, max(epoch) " \
    "from #{q_db}.#{q_schema}.#{q_table} " \
    "#{q_where.empty? ? '' : "where #{q_where} "}" \
    "having max(epoch) > #{q_singular_last_epoch}"
  query_and_get_events(:singular, sql)
end
process() { |events| ... } click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 32
def process
  unless resource_valid?
    $logger.warn { "Broken resource: #{resource.to_s}" }
    return nil
  end

  $logger.debug {
    msgs = ["Start process #{resource.uri}"]
    msgs << "periodic_last_epoch:#{periodic_last_epoch}" if periodic_last_epoch
    msgs << "singular_last_epoch:#{singular_last_epoch}" if singular_last_epoch
    msgs.join(', ')
  }

  if periodic?
    periodic_events, new_periodic_last_epoch = get_periodic_events
    events = periodic_events || []
  end
  if singular?
    singular_events, new_singular_last_epoch = get_singular_events
    events.nil? ? (events = singular_events) : events.concat(singular_events || [])
  end

  $logger.debug {
    msgs = ["Finish process #{resource.uri}"]
    msgs << "periodic_last_epoch:#{periodic_last_epoch}" if periodic_last_epoch
    msgs << "singular_last_epoch:#{singular_last_epoch}" if singular_last_epoch
    msgs << "new_periodic_last_epoch:#{new_periodic_last_epoch}" if new_periodic_last_epoch
    msgs << "new_singular_last_epoch:#{new_singular_last_epoch}" if new_singular_last_epoch
    msgs.join(', ')
  }

  return nil if events.nil? || events.empty?
  yield(events) if block_given? # send_message
  update_status_file(:periodic_last_epoch, new_periodic_last_epoch) if new_periodic_last_epoch
  update_status_file(:singular_last_epoch, new_singular_last_epoch) if new_singular_last_epoch
  true
end

Private Instance Methods

build_daily_events_from_hourly(rows) click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 200
def build_daily_events_from_hourly(rows)
  max_epoch_of = {}
  rows.each do |row|
    date, hour, epoch = row[0], row[1], row[2]
    max_epoch_of[date] = [epoch, max_epoch_of[date] || 0].max
  end
  daily_events = max_epoch_of.map do |date, epoch|
    {
      uuid: SecureRandom.uuid,
      resource_uri: resource.uri,
      resource_unit: 'daily',
      resource_time: date_hour_to_i(date, 0, resource.timezone),
      resource_timezone: resource.timezone,
      payload: {d: date.to_s, h: 0, epoch: epoch}.to_json,
    }
  end
end
build_events(unit, rows) click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 186
def build_events(unit, rows)
  rows.map do |row|
    date, hour, epoch = row[0], row[1], row[2]
    {
      uuid: SecureRandom.uuid,
      resource_uri: resource.uri,
      resource_unit: unit.to_s,
      resource_time: date_hour_to_i(date, hour, resource.timezone),
      resource_timezone: resource.timezone,
      payload: (date ? {d: date.to_s, h: hour.to_i} : {}).merge!(epoch: epoch).to_json,
    }
  end
end
build_latest_epoch(rows) click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 182
def build_latest_epoch(rows)
  rows.map {|row| row[2] }.max
end
daily?() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 160
def daily?
  return @is_daily unless @is_daily.nil?
  @is_daily = resource.unit.include?('daily')
end
date_column() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 251
def date_column
  parsed_query['date'] || $setting.dig(:vertica, :date_column) || 'd'
end
date_hour_to_i(date, hour, timezone) click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 218
def date_hour_to_i(date, hour, timezone)
  return 0 if date.nil?
  Time.strptime("#{date.strftime("%Y-%m-%d")} #{hour.to_i} #{timezone}", '%Y-%m-%d %H %z').to_i
end
dates() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 174
def dates
  return @dates if @dates
  now = Time.now.localtime(resource.timezone)
  @dates = resource.span_in_days.times.map do |i|
    (now - (i * 86400)).strftime('%Y-%m-%d')
  end
end
db() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 239
def db
  @db ||= parsed_uri.path[1..-1].split('/')[0]
end
get_current_epoch() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 140
def get_current_epoch
  connection.query('select GET_CURRENT_EPOCH()').first.first
end
get_from_status_file(key) click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 136
def get_from_status_file(key)
  @status.getsetnx(key.to_sym, get_current_epoch)
end
hourly?() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 155
def hourly?
  return @is_hourly unless @is_hourly.nil?
  @is_hourly = resource.unit.include?('hourly')
end
parsed_query() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 235
def parsed_query
  @parsed_query ||= Rack::Utils.parse_nested_query(parsed_uri.query)
end
parsed_uri() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 231
def parsed_uri
  @parsed_uri ||= URI.parse(resource.uri)
end
periodic?() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 170
def periodic?
  hourly? or daily?
end
q_date() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 275
def q_date
  @q_date ||= ::Vertica.quote_identifier(date_column)
end
q_db() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 263
def q_db
  @q_db ||= ::Vertica.quote_identifier(db)
end
q_periodic_last_epoch() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 223
def q_periodic_last_epoch
  @q_periodic_last_epoch ||= ::Vertica.quote(periodic_last_epoch)
end
q_schema() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 267
def q_schema
  @q_schema ||= ::Vertica.quote_identifier(schema)
end
q_singular_last_epoch() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 227
def q_singular_last_epoch
  @q_singular_last_epoch ||= ::Vertica.quote(singular_last_epoch)
end
q_table() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 271
def q_table
  @q_table ||= ::Vertica.quote_identifier(table)
end
q_timestamp() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 279
def q_timestamp
  @q_timestamp ||= ::Vertica.quote_identifier(timestamp_column)
end
q_where() click to toggle source

Value specification:

  • A value looks like an integer string is treated as an integer.

  • If you want to treat it as as string, surround with double quote or single quote.

  • A value does not look like an integer is treated as a string.

Operator specification:

  • Only equality is supported now

# File lib/triglav/agent/vertica/monitor.rb, line 289
def q_where
  @q_where ||= where.map do |col, val|
    begin
      val = Integer(val)
    rescue => e
      if val.start_with?("'") and val.end_with?("'")
        val = val[1..-2]
      elsif val.start_with?('"') and val.end_with?('"')
        val = val[1..-2]
      end
    end
    "#{::Vertica.quote_identifier(col)} = #{::Vertica.quote(val)}"
  end.join(' AND ')
end
query_and_get_events(unit, sql) click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 118
def query_and_get_events(unit, sql)
  $logger.debug { "Query: #{sql}" }
  rows = connection.query(sql)
  events = build_events(unit, rows)
  new_last_epoch = build_latest_epoch(rows)
  [events, new_last_epoch, rows]
rescue ::Vertica::Error::QueryError => e
  $logger.warn { "#{e.class} #{e.message}" } # e.message includes sql
  nil
rescue ::Vertica::Error::TimedOutError => e
  $logger.warn { "#{e.class} #{e.message} SQL:#{sql}" }
  nil
end
resource_unit_valid?() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 148
def resource_unit_valid?
  resource.unit.split(',').each do |item|
    return false unless %w[singular daily hourly].include?(item)
  end
  true
end
resource_valid?() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 144
def resource_valid?
  resource_unit_valid? && !resource.timezone.nil? && !resource.span_in_days.nil?
end
schema() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 243
def schema
  @schema ||= parsed_uri.path[1..-1].split('/')[1]
end
singular?() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 165
def singular?
  return @is_singular unless @is_singular.nil?
  @is_singular = resource.unit.include?('singular')
end
table() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 247
def table
  @table ||= parsed_uri.path[1..-1].split('/')[2]
end
timestamp_column() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 255
def timestamp_column
  parsed_query['timestamp'] || $setting.dig(:vertica, :timestamp_column) || 't'
end
update_status_file(key, last_epoch) click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 132
def update_status_file(key, last_epoch)
  @status.set(key.to_sym, last_epoch)
end
where() click to toggle source
# File lib/triglav/agent/vertica/monitor.rb, line 259
def where
  parsed_query['where'] || {}
end