class PulsarAdmin::Api

Constants

BatchHeader
PropertyPrefix
PublishTimeHeader

Public Class Methods

new(opts) click to toggle source

opts

endpoint
tenant
persistent
# File lib/pulsar_admin/api.rb, line 11
def initialize(opts)
  @endpoint = URI.parse(opts[:endpoint])
  @tenant = opts[:tenant]
  @persistent = opts[:persistent] == false ? 'non-persistent' : 'persistent'
end

Public Instance Methods

create_namespace(name) click to toggle source
# File lib/pulsar_admin/api.rb, line 23
def create_namespace(name)
  put('/admin/v2/namespaces/:tenant/:namespace', namespace: name)
end
create_topic(namespace, topic, partitions = 0) click to toggle source
# File lib/pulsar_admin/api.rb, line 40
def create_topic(namespace, topic, partitions = 0)
  put("/admin/v2/:persistent/:tenant/:namespace/:topic#{partitions.zero? ? '' : '/partitions'}",
        {namespace: namespace, topic: topic}, partitions
      )
end
delete_namespace(name) click to toggle source
# File lib/pulsar_admin/api.rb, line 27
def delete_namespace(name)
  delete('/admin/v2/namespaces/:tenant/:namespace', namespace: name)
end
delete_topic(namespace, topic) click to toggle source
# File lib/pulsar_admin/api.rb, line 46
def delete_topic(namespace, topic)
  res1 = delete('/admin/v2/:persistent/:tenant/:namespace/:topic',
          namespace: namespace, topic: topic
        )

  res2 = delete('/admin/v2/:persistent/:tenant/:namespace/:topic/partitions',
    namespace: namespace, topic: topic
  )

  res1 || res2
end
list_namespaces() click to toggle source
# File lib/pulsar_admin/api.rb, line 17
def list_namespaces
  get('/admin/v2/namespaces/:tenant').map do |ns|
    ns.sub("#{@tenant}/", '')
  end
end
namespace_topics(namespace) click to toggle source
# File lib/pulsar_admin/api.rb, line 31
def namespace_topics(namespace)
  result = {}
  ['', '/partitioned'].flat_map do |pd|
    resp = get("/admin/v2/:persistent/:tenant/:namespace#{pd}", namespace: namespace)
    result[pd.empty? ? 'non-partitioned' : 'partitioned'] = resp
  end
  result
end
peek_messages(options) click to toggle source

options

namespace
topic
sub_name
message_position
count
# File lib/pulsar_admin/api.rb, line 64
def peek_messages(options)
  (options[:count] || 1).times.map do |x|
    opts = options.dup
    opts[:message_position] = (opts[:message_position].to_i + x + 1).to_s
    peek_message(opts)
  end.compact
end

Private Instance Methods

combine_default_value(opts) click to toggle source
# File lib/pulsar_admin/api.rb, line 187
def combine_default_value(opts)
  opts.merge(
    tenant: @tenant,
    persistent: @persistent
  )
end
delete(path, payload = {}) click to toggle source
# File lib/pulsar_admin/api.rb, line 117
def delete(path, payload = {})
  uri = @endpoint.dup
  uri.path, payload = handle_restful_path(path, payload)

  req = Net::HTTP::Delete.new(uri)
  req.body = payload.to_json
  req.content_type = 'application/json'

  res = Net::HTTP.start(uri.hostname, uri.port) do |http|
    http.request(req)
  end

  case res
  when Net::HTTPSuccess, Net::HTTPNoContent
    return true
  else
    return false
  end
end
get(path, params = {}) click to toggle source
# File lib/pulsar_admin/api.rb, line 112
def get(path, params = {})
  resp = raw_get(path, params)
  try_decode_body(resp)
end
handle_restful_path(path, options) click to toggle source
# File lib/pulsar_admin/api.rb, line 173
def handle_restful_path(path, options)
  return path unless path.include?(':')

  opts = combine_default_value(options || {})
  opts.keys.sort.reverse.each do |k|
    remark = ":#{k}"
    next unless path.include?(remark)
    path.gsub!(remark, opts[k])
    options.delete(k)
  end

  return [path, options]
end
peek_message(options) click to toggle source

options

namespace
topic
sub_name
message_position
# File lib/pulsar_admin/api.rb, line 142
def peek_message(options)
  options[:message_position] = options[:message_position].to_s
  resp = raw_get('/admin/v2/:persistent/:tenant/:namespace/:topic/subscription/:sub_name/position/:message_position', options)
  unless request_ok?(resp)
    puts resp.body
    return
  end

  payload = resp.body

  msg_id = nil
  properties = {}
  resp.each_header do |header|
    case header
    when PublishTimeHeader
      properties['publish-time'] = resp.header[header]
    when BatchHeader
      properties['pulsar-num-batch-message'] = resp.header[header]
    when PropertyPrefix
      properties[header] = resp.header[header]
    when /^X-Pulsar-Message-ID$/i
      msg_id = resp.header[header]
    end
  end
  [
    msg_id,
    properties,
    payload
  ]
end
put(path, payload = {}, body = nil) click to toggle source
# File lib/pulsar_admin/api.rb, line 73
def put(path, payload = {}, body = nil)
  uri = @endpoint.dup
  uri.path, payload = handle_restful_path(path, payload)

  req = Net::HTTP::Put.new(uri)

  if payload.empty?
    req.body = body.to_s
    req.content_type = body.nil? ? 'application/json' : 'text/plain'
  else
    req.body = payload.to_json
    req.content_type = 'application/json'
  end

  res = Net::HTTP.start(uri.hostname, uri.port) do |http|
    http.request(req)
  end

  case res
  when Net::HTTPSuccess, Net::HTTPNoContent
    return true
  else
    puts "status: #{res.code} - body: #{res.body} - #{res.inspect}"
    return false
  end
end
raw_get(path, params = {}) click to toggle source
# File lib/pulsar_admin/api.rb, line 100
def raw_get(path, params = {})
  uri = @endpoint.dup
  uri.path, params = handle_restful_path(path, params)

  req = Net::HTTP::Get.new(uri)
  req.set_form_data(params) unless params.empty?

  Net::HTTP.start(uri.hostname, uri.port) do |http|
    http.request(req)
  end
end
request_ok?(resp) click to toggle source
# File lib/pulsar_admin/api.rb, line 194
def request_ok?(resp)
  case resp
  when Net::HTTPSuccess
    true
  when Net::HTTPRedirection
    false
  else
    false
  end
end
try_decode_body(resp) click to toggle source
# File lib/pulsar_admin/api.rb, line 205
def try_decode_body(resp)
  unless request_ok?(resp)
    puts resp.body
    return
  end
  return resp.body unless resp.content_type =~ /application\/json/

  JSON.parse(resp.body) rescue resp.body
end