class Presto::Metrics::Query

Public Class Methods

new(client) click to toggle source
# File lib/presto/metrics/query.rb, line 13
def initialize(client)
  @client = client
end

Public Instance Methods

count_total_processed_rows(stage) click to toggle source
# File lib/presto/metrics/query.rb, line 115
def count_total_processed_rows(stage)
  return 0 unless stage
  rows = stage.stage_stats.raw_input_positions + stage.stage_stats.output_positions
  rows + (stage.sub_stages || []).map { |ss| count_total_processed_rows(ss) }.inject(0, :+)
end
find(queryId) click to toggle source
# File lib/presto/metrics/query.rb, line 69
def find(queryId)
  begin
    JSON.parse(@client.get_query_json(queryId, "{}"))
  rescue
    {}
  end
end
find_tasks(sub_stages) click to toggle source
# File lib/presto/metrics/query.rb, line 164
def find_tasks(sub_stages)
  task_list = []
  return task_list unless sub_stages
  sub_stages.each { |ss|
    tl = ss['tasks']
    task_list << tl if tl
    task_list << find_tasks(ss['subStages'])
  }
  task_list.flatten()
end
format_table(tbl, label, align, sep) click to toggle source
# File lib/presto/metrics/query.rb, line 17
def format_table(tbl, label, align, sep)
  # Compute col length
  col = {}
  label.each_with_index { |l, i| col[i] = l.length }
  tbl.each { |row|
    row.each_with_index { |cell, i|
      l = cell.to_s.size if cell
      l ||= 0
      col[i] ||= l
      col[i] = [[col[i], l].max, 150].min
    }
  }
  # print label
  line = []
  label.each_with_index { |l, i|
    line << l.to_s[0..col[i]].ljust(col[i])
  }
  puts line.join(sep)
  tbl.each { |row|
    line = []
    row.each_with_index { |cell, i|
      str = cell.to_s[0..col[i]]
      a = align[i] || 'l'
      case a
        when 'r'
          line << str.rjust(col[i])
        when 'l'
          line << str.ljust(col[i])
        else
          line << str.ljust(col[i])
      end
    }
    puts line.join(sep)
  }
  0
end
list() click to toggle source
# File lib/presto/metrics/query.rb, line 54
def list
  ql = query_list
  tbl = ql.map { |q|
    s = q['session'] || {}
    query = q['query'].gsub(/[\t\r\n]/, ' ').gsub(/ {1,}/, ' ').strip
    [q['queryId'], q['elapsedTime'], q['state'], q['runningDrivers'], q['completedDrivers'], q['totalDrivers'], s['user'], s['catalog'], s['schema'], s['source'], query]
  }.sort_by { |row| row[0] }.reverse

  format_table(tbl,
      %w|query time state r f t user catalog schema source sql|,
      %w|r     r    r     r r r l    l       r      l      l  |,
      ' '
  )
end
metrics() click to toggle source
# File lib/presto/metrics/query.rb, line 140
def metrics
  ql = query_list
  ql.map { |qi|
    h = {}
    h['query_id'] = qi['queryId'] || ''
    h['state'] = qi['state'] || ''
    session = qi['session'] || {}
    stats = qi['queryStats'] || {}
    h['source'] = session['source'] || ''
    h['user'] = session['user'] || h['source'].gsub(/[^a-zA-Z0-9]/, '')
    h['running_drivers'] = stats['runningDrivers'] || 0
    h['queued_drivers'] = stats['queuedDrivers'] || 0
    h['completed_drivers'] = stats['completedDrivers'] || 0
    h['total_drivers'] = stats['totalDrivers'] || 0
    h['elapsed_time'] = stats['elapsedTime'] || '0.0m'
    h['create_time'] = stats['createTime']
    h['running_time'] = stats['endTime'] || Time.now.utc.iso8601(3) # end_time ?
    #if(h['state'] == "FAILED")
    #     h['errorCode'] = find(h['query_id'])['errorCode'] || {}
    #end
    h
  }
end
processed_rows(query_id) click to toggle source
# File lib/presto/metrics/query.rb, line 121
def processed_rows(query_id)
  qi = QueryInfo.decode(find(query_id))
  count_total_processed_rows(qi.output_stage)
end
query_list(path="") click to toggle source
# File lib/presto/metrics/query.rb, line 99
def query_list(path="")
  begin
    JSON.parse(@client.get_query_json(path))
  rescue
    []
  end
end
query_progress() click to toggle source
# File lib/presto/metrics/query.rb, line 126
def query_progress
  running_queries = query_list.map { |q| QueryInfo.decode(q) }.select { |q| q.state == :running }
  query_info = running_queries.map { |q|
    QueryInfo.decode(find(q.query_id))
  }

  result = {}
  query_info.each { |q|
    os = q.output_stage
    result[q.query_id] = count_total_processed_rows(os)
  }
  result
end
running_list() click to toggle source
# File lib/presto/metrics/query.rb, line 107
def running_list
  ql = query_list.select { |q| q['state'] == 'RUNNING' }.sort_by { |row| row[0] }.reverse
  ql.each { |q|
    tasks(q['queryId'])
  }
  ql.size
end
task_list(queryId) click to toggle source
# File lib/presto/metrics/query.rb, line 77
def task_list(queryId)
  qj = find(queryId) || {}
  root_stage = qj['outputStage'] || {}
  tasks = root_stage['tasks'] || []
  tasks << find_tasks(root_stage['subStages'] || [])
  tasks.flatten
end
tasks(queryId) click to toggle source
# File lib/presto/metrics/query.rb, line 85
def tasks(queryId)
  tl = task_list(queryId)
  stats = tl.map { |t|
    s = t['stats']
    host = (t['self'] || '').sub(/http:\/\/([a-z0-9\-.]+[\/:][0-9]+)\/.*/, '\1')
    [t['taskId'], host, t['state'], s['rawInputPositions'], s['rawInputDataSize'], s['queuedDrivers'], s['runningDrivers'], s['completedDrivers']]
  }
  format_table(stats,
      %w|task_id host    state processed_rows size|,
      %w|l       l       l     r          r|,
      ' '
  )
end