class PulsarAdmin::Api
Constants
- BatchHeader
- PropertyPrefix
- PublishTimeHeader
Public Class Methods
new(opts)
click to toggle source
opts
endpoint tenant persistent
# File lib/pulsar_admin/api.rb, line 11 def initialize(opts) @endpoint = URI.parse(opts[:endpoint]) @tenant = opts[:tenant] @persistent = opts[:persistent] == false ? 'non-persistent' : 'persistent' end
Public Instance Methods
create_namespace(name)
click to toggle source
# File lib/pulsar_admin/api.rb, line 23 def create_namespace(name) put('/admin/v2/namespaces/:tenant/:namespace', namespace: name) end
create_topic(namespace, topic, partitions = 0)
click to toggle source
# File lib/pulsar_admin/api.rb, line 40 def create_topic(namespace, topic, partitions = 0) put("/admin/v2/:persistent/:tenant/:namespace/:topic#{partitions.zero? ? '' : '/partitions'}", {namespace: namespace, topic: topic}, partitions ) end
delete_namespace(name)
click to toggle source
# File lib/pulsar_admin/api.rb, line 27 def delete_namespace(name) delete('/admin/v2/namespaces/:tenant/:namespace', namespace: name) end
delete_topic(namespace, topic)
click to toggle source
# File lib/pulsar_admin/api.rb, line 46 def delete_topic(namespace, topic) res1 = delete('/admin/v2/:persistent/:tenant/:namespace/:topic', namespace: namespace, topic: topic ) res2 = delete('/admin/v2/:persistent/:tenant/:namespace/:topic/partitions', namespace: namespace, topic: topic ) res1 || res2 end
list_namespaces()
click to toggle source
# File lib/pulsar_admin/api.rb, line 17 def list_namespaces get('/admin/v2/namespaces/:tenant').map do |ns| ns.sub("#{@tenant}/", '') end end
namespace_topics(namespace)
click to toggle source
# File lib/pulsar_admin/api.rb, line 31 def namespace_topics(namespace) result = {} ['', '/partitioned'].flat_map do |pd| resp = get("/admin/v2/:persistent/:tenant/:namespace#{pd}", namespace: namespace) result[pd.empty? ? 'non-partitioned' : 'partitioned'] = resp end result end
peek_messages(options)
click to toggle source
options
namespace topic sub_name message_position count
# File lib/pulsar_admin/api.rb, line 64 def peek_messages(options) (options[:count] || 1).times.map do |x| opts = options.dup opts[:message_position] = (opts[:message_position].to_i + x + 1).to_s peek_message(opts) end.compact end
Private Instance Methods
combine_default_value(opts)
click to toggle source
# File lib/pulsar_admin/api.rb, line 187 def combine_default_value(opts) opts.merge( tenant: @tenant, persistent: @persistent ) end
delete(path, payload = {})
click to toggle source
# File lib/pulsar_admin/api.rb, line 117 def delete(path, payload = {}) uri = @endpoint.dup uri.path, payload = handle_restful_path(path, payload) req = Net::HTTP::Delete.new(uri) req.body = payload.to_json req.content_type = 'application/json' res = Net::HTTP.start(uri.hostname, uri.port) do |http| http.request(req) end case res when Net::HTTPSuccess, Net::HTTPNoContent return true else return false end end
get(path, params = {})
click to toggle source
# File lib/pulsar_admin/api.rb, line 112 def get(path, params = {}) resp = raw_get(path, params) try_decode_body(resp) end
handle_restful_path(path, options)
click to toggle source
# File lib/pulsar_admin/api.rb, line 173 def handle_restful_path(path, options) return path unless path.include?(':') opts = combine_default_value(options || {}) opts.keys.sort.reverse.each do |k| remark = ":#{k}" next unless path.include?(remark) path.gsub!(remark, opts[k]) options.delete(k) end return [path, options] end
peek_message(options)
click to toggle source
options
namespace topic sub_name message_position
# File lib/pulsar_admin/api.rb, line 142 def peek_message(options) options[:message_position] = options[:message_position].to_s resp = raw_get('/admin/v2/:persistent/:tenant/:namespace/:topic/subscription/:sub_name/position/:message_position', options) unless request_ok?(resp) puts resp.body return end payload = resp.body msg_id = nil properties = {} resp.each_header do |header| case header when PublishTimeHeader properties['publish-time'] = resp.header[header] when BatchHeader properties['pulsar-num-batch-message'] = resp.header[header] when PropertyPrefix properties[header] = resp.header[header] when /^X-Pulsar-Message-ID$/i msg_id = resp.header[header] end end [ msg_id, properties, payload ] end
put(path, payload = {}, body = nil)
click to toggle source
# File lib/pulsar_admin/api.rb, line 73 def put(path, payload = {}, body = nil) uri = @endpoint.dup uri.path, payload = handle_restful_path(path, payload) req = Net::HTTP::Put.new(uri) if payload.empty? req.body = body.to_s req.content_type = body.nil? ? 'application/json' : 'text/plain' else req.body = payload.to_json req.content_type = 'application/json' end res = Net::HTTP.start(uri.hostname, uri.port) do |http| http.request(req) end case res when Net::HTTPSuccess, Net::HTTPNoContent return true else puts "status: #{res.code} - body: #{res.body} - #{res.inspect}" return false end end
raw_get(path, params = {})
click to toggle source
# File lib/pulsar_admin/api.rb, line 100 def raw_get(path, params = {}) uri = @endpoint.dup uri.path, params = handle_restful_path(path, params) req = Net::HTTP::Get.new(uri) req.set_form_data(params) unless params.empty? Net::HTTP.start(uri.hostname, uri.port) do |http| http.request(req) end end
request_ok?(resp)
click to toggle source
# File lib/pulsar_admin/api.rb, line 194 def request_ok?(resp) case resp when Net::HTTPSuccess true when Net::HTTPRedirection false else false end end
try_decode_body(resp)
click to toggle source
# File lib/pulsar_admin/api.rb, line 205 def try_decode_body(resp) unless request_ok?(resp) puts resp.body return end return resp.body unless resp.content_type =~ /application\/json/ JSON.parse(resp.body) rescue resp.body end