class Kapacitor::Client

Attributes

http[R]

@return [Net::HTTP] HTTP client instance

url[R]

@return [URI] Kapacitor REST API URL

Public Class Methods

new(url: 'http://localhost:9092/kapacitor', version: 'v1') click to toggle source

Create a new client

@param url [String] Kapacitor REST API's URL (defaults to `localhost:9092`) @param version [Integer] API version (defaults to `v1preview`)

# File lib/kapacitor/client.rb, line 16
def initialize(url: 'http://localhost:9092/kapacitor', version: 'v1')
  @http = HTTPClient.new
  @url = [url, version].join('/')
end

Public Instance Methods

define_task(id:, dbrps:, **opts) click to toggle source

Define a Kapacitor task

@param id [String] Task ID @param dbrps [String] List of database retention policy pairs the task is allowed to access. @param **opts [Hash] Any number of task parameters to push into the Hash

# File lib/kapacitor/client.rb, line 98
def define_task(id:, dbrps:, **opts)
  if (opts[:template_id].nil? && opts[:type].nil? && opts[:script].nil?) || (opts[:template_id] && (opts[:type] || opts[:script]))
    raise ArgumentError, "Must specify either a Template ID or a script and type"
  elsif opts[:template_id].nil? && (opts[:type].nil? || opts[:script].nil?)
    raise ArgumentError, "Must specify both task type and script when not using a Template ID"
  end

  if opts[:status]
    raise ArgumentError, "Kapacitor task status can be either 'enabled' or 'disabled'" unless opts[:status] == 'enabled' || opts[:status] == 'disabled'
  end

  if opts[:type]
    raise ArgumentError, "Kapacitor task type can be either 'batch' or 'stream'" unless opts[:type] == 'batch' || opts[:type] == 'stream'
  end

  req = {
    'id' => id,
    'dbrps' => dbrps,
    'status' => opts[:status] || 'enabled'
  }

  if opts[:template_id]
    req['template-id'] = opts[:template_id]
  else
    req['type'] = opts[:type]
    req['script'] = opts[:script]
  end

  req['vars'] = opts[:vars] if opts[:vars]

  api_post(endpoint: 'tasks', data: req)
end
define_template(id:, type:, script:) click to toggle source

Define a Kapacitor template

@param id [String] Template ID @param type [String] Template type. Valid values: `batch`, `stream`. @param script [String] Tick script

# File lib/kapacitor/client.rb, line 27
def define_template(id:, type:, script:)
  raise ArgumentError, "Kapacitor template type can be either 'batch' or 'stream'" unless type == 'batch' || type == 'stream'

  req = {
    'id' => id,
    'type' => type,
    'script' => script
  }

  api_post(endpoint: 'templates', data: req)
end
define_topic_handler(id:, topic:, kind:, match: nil, options: {}) click to toggle source

Define a topic handler

@param id [String] Handler ID @param topic [String] Topic name @param kind [String] Kind of handler @param match [String] Lambda expression @param options [Hash] Handler options

# File lib/kapacitor/client.rb, line 200
def define_topic_handler(id:, topic:, kind:, match: nil, options: {})
  req = {
    'id': id,
    'kind': kind
  }
  req['match'] = match unless match.nil?
  req['options'] = options
  api_post(endpoint: "alerts/topics/#{topic}/handlers", data: req)
end
delete_task(id:) click to toggle source

Delete a Kapacitor task

@param id [String] Task ID

# File lib/kapacitor/client.rb, line 165
def delete_task(id:)
  api_delete(endpoint: "tasks/#{id}")
end
delete_template(id:) click to toggle source

Delete a Kapacitor template

@param id [String] Template ID

# File lib/kapacitor/client.rb, line 60
def delete_template(id:)
  api_delete(endpoint: "templates/#{id}")
end
delete_topic_handler(id:, topic:) click to toggle source

Delete a topic handler

@param id [String] Handler ID @param topic [String] Topic name

# File lib/kapacitor/client.rb, line 233
def delete_topic_handler(id:, topic:)
  api_delete(endpoint: "alerts/topics/#{topic}/handlers/#{id}")
end
tasks(offset: 0, limit: 100) click to toggle source

Retrieve Kapacitor tasks

@param offset [Integer] Offset count for paginating through tasks @param limit [Integer] Maximum number of tasks to return @return [Array] List of tasks

# File lib/kapacitor/client.rb, line 175
def tasks(offset: 0, limit: 100)
  tasks = []

  loop do
    res = api_get(endpoint: "tasks?fields=id&offset=#{offset}&limit=#{limit}")['tasks']
    break unless res.size > 0

    res.each do |task|
      tasks << api_get(endpoint: "tasks/#{task['id']}")
    end

    offset += limit
  end

  tasks
end
templates(offset: 0, limit: 100) click to toggle source

Retrieve Kapacitor templates

@param offset [Integer] Offset count for paginating through templates @param limit [Integer] Maximum number of templates to return @return [Array] List of templates

# File lib/kapacitor/client.rb, line 79
def templates(offset: 0, limit: 100)
  ret = []

  loop do
    res = api_get(endpoint: "templates?offset=#{offset}&limit=#{limit}")['templates']
    break unless res.size > 0
    ret += res
    offset += limit
  end

  ret
end
topic_handlers(topic:) click to toggle source

Retrieve topic's handlers

@param topic [String] Topic name @return [Array] List of handlers

# File lib/kapacitor/client.rb, line 242
def topic_handlers(topic:)
  api_get(endpoint: "alerts/topics/#{topic}/handlers")['handlers']
end
topics() click to toggle source

Retrieve Kapacitor topic

@return [List] List of topics

# File lib/kapacitor/client.rb, line 68
def topics()
  res = api_get(endpoint: "alerts/topics")
  return res['topics'].map { |v| v['id'] }
end
update_task(id:, **opts) click to toggle source

Update a Kapacitor task

@param id [String] Task ID @param **opts [Hash] Any number of task parameters to push into the Hash

# File lib/kapacitor/client.rb, line 136
def update_task(id:, **opts)
  req = {}
  req['template-id'] = opts[:template_id] if opts[:template_id]
  req['type'] = opts[:type] if opts[:type]
  req['dbrps'] = opts[:dbrps] if opts[:dbrps]
  req['script'] = opts[:script] if opts[:script]
  req['status'] = 'disabled'
  req['vars'] = opts[:vars] if opts[:vars]

  if opts[:type]
    raise ArgumentError, "Kapacitor template type can be either 'batch' or 'stream'" unless opts[:type] == 'batch' || opts[:type] == 'stream'
  end

  if opts['status']
    raise ArgumentError, "Kapacitor task status can be either 'enabled' or 'disabled'" unless opts[:status] == 'enabled' || opts[:status] == 'disabled'
  end

  api_patch(endpoint: "tasks/#{id}", data: req) unless req.empty?

  if opts[:status] == 'enabled'
    req['status'] = 'enabled'
    api_patch(endpoint: "tasks/#{id}", data: req) unless req.empty?
  end
end
update_template(id:, **opts) click to toggle source

Update a Kapacitor template

@param id [String] Template ID @param **opts [Hash] Any number of template parameters to push into the Hash

# File lib/kapacitor/client.rb, line 44
def update_template(id:, **opts)
  req = {}
  req['type'] = opts[:type] if opts[:type]
  req['script'] = opts[:script] if opts[:script]

  if opts[:type]
    raise ArgumentError, "Kapacitor template type can be either 'batch' or 'stream'" unless opts[:type] == 'batch' or opts[:type] == 'stream'
  end

  api_patch(endpoint: "templates/#{id}", data: req) unless req.empty?
end
update_topic_handler(id:, topic:, kind:, match: nil, options: nil) click to toggle source

Update a topic handler

@param id [String] Handler ID @param topic [String] Topic name @param kind [String] Kind of handler @param match [String] Lambda expression @param options [Hash] Handler options

# File lib/kapacitor/client.rb, line 218
def update_topic_handler(id:, topic:, kind:, match: nil, options: nil)
  req = {
    'id': id,
    'kind': kind
  }
  req['match'] = match unless match.nil?
  req['options'] = options unless options.nil?
  api_put(endpoint: "alerts/topics/#{topic}/handlers/#{id}", data: req) unless req.empty?
end

Private Instance Methods

api_delete(endpoint:) click to toggle source

Perform a HTTP DELETE request

@param endpoint [String] HTTP API endpoint

# File lib/kapacitor/client.rb, line 300
def api_delete(endpoint:)
  begin
    resp = self.http.delete([self.url, endpoint].join('/'), {'Content-type' => 'application/json', 'Accept' => 'application/json'})
    begin
      data = JSON.parse(resp.body) unless resp.body.empty?
    rescue JSON::ParserError
      raise Exception, "Failed to decode response message"
    end
    if resp.status != 204
      error = data.include?('error') ? data['error'] : data.inspect if data
      raise Exception, "Query returned a non successful HTTP code (Status: #{resp.status}, Reason: #{resp.reason}#{", Error: #{error}" if error}"
    end
  rescue
    raise Exception, "Failed to execute DELETE request to Kapacitor REST API (#{$!})"
  end

  data
end
api_get(endpoint:, query: nil) click to toggle source

Perform a HTTP GET request

@param endpoint [String] HTTP API endpoint @param query [String] HTTP query @return [Array, Hash] API response

# File lib/kapacitor/client.rb, line 253
def api_get(endpoint:, query: nil)
  begin
    resp = self.http.get([self.url, endpoint].join('/'), query, {'Content-type' => 'application/json', 'Accept' => 'application/json'})
    begin
      data = JSON.parse(resp.body) unless resp.body.empty?
    rescue JSON::ParserError
      raise Exception, "Failed to decode response message"
    end
    if resp.status != 200
      error = data.include?('error') ? data['error'] : data.inspect if data
      raise Exception, "Query returned a non successful HTTP code (Status: #{resp.status}, Reason: #{resp.reason}#{", Error: #{error}" if error}"
    end
  rescue
    raise Exception, "Failed to execute GET request to Kapacitor REST API (#{$!})"
  end

  data
end
api_patch(endpoint:, data:) click to toggle source

Perform a HTTP PATCH request

@param endpoint [String] HTTP API endpoint @param data [Hash] Request data

# File lib/kapacitor/client.rb, line 324
def api_patch(endpoint:, data:)
  begin
    resp = self.http.patch([self.url, endpoint].join('/'), data.to_json, {'Content-Type' => 'application/json', 'Accept' => 'application/json'})
    begin
      data = JSON.parse(resp.body) unless resp.body.empty?
    rescue JSON::ParserError
      raise Exception, "Failed to decode response message"
    end
    if resp.status != 200
      error = data.include?('error') ? data['error'] : data.inspect if data
      raise Exception, "Query returned a non successful HTTP code (Status: #{resp.status}, Reason: #{resp.reason}#{", Error: #{error}" if error}"
    end
  rescue
    raise Exception, "Failed to execute PATCH request to Kapacitor REST API (#{$!})"
  end

  data
end
api_post(endpoint:, data:) click to toggle source

Perform a HTTP POST request

@param endpoint [String] HTTP API endpoint @param data [Hash] Request data

# File lib/kapacitor/client.rb, line 277
def api_post(endpoint:, data:)
  begin
    resp = self.http.post([self.url, endpoint].join('/'), data.to_json, {'Content-Type' => 'application/json', 'Accept' => 'application/json'})
    begin
      data = JSON.parse(resp.body) unless resp.body.empty?
    rescue JSON::ParserError
      raise Exception, "Failed to decode response message"
    end
    if resp.status != 200
      error = data.include?('error') ? data['error'] : data.inspect if data
      raise Exception, "Query returned a non successful HTTP code (Status: #{resp.status}, Reason: #{resp.reason}#{", Error: #{error}" if error}"
    end
  rescue
    raise Exception, "Failed to execute POST request to Kapacitor REST API (#{$!})"
  end

  data
end
api_put(endpoint:, data:) click to toggle source

Perform a HTTP PUT request

@param endpoint [String] HTTP API endpoint @param data [Hash] Request data

# File lib/kapacitor/client.rb, line 348
def api_put(endpoint:, data:)
  begin
    resp = self.http.put([self.url, endpoint].join('/'), data.to_json, {'Content-Type' => 'application/json', 'Accept' => 'application/json'})
    begin
      data = JSON.parse(resp.body) unless resp.body.empty?
    rescue JSON::ParserError
      raise Exception, "Failed to decode response message"
    end
    if resp.status != 200
      error = data.include?('error') ? data['error'] : data.inspect if data
      raise Exception, "Query returned a non successful HTTP code (Status: #{resp.status}, Reason: #{resp.reason}#{", Error: #{error}" if error}"
    end
  rescue
    raise Exception, "Failed to execute PUT request to Kapacitor REST API (#{$!})"
  end

  data
end