class DatahubHttpClient
@author pholy.ht ## @time 2016-03-24 ##
Attributes
access_id[RW]
access_key[RW]
endpoint[RW]
Public Class Methods
new(endpoint, access_id, access_key)
click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 19 def initialize(endpoint, access_id, access_key) @endpoint = endpoint @access_id = access_id @access_key = access_key end
Public Instance Methods
create_project(project_name, comment)
click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 112 def create_project(project_name, comment) path = "/projects/" + project_name params = {} params["Comment"] = comment return send_request("POST", path, params) end
create_topic(project_name, topic_name, shard_count, lifecycle, record_type, record_schema, comment)
click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 142 def create_topic(project_name, topic_name, shard_count, lifecycle, record_type, record_schema, comment) path = "/projects/" + project_name + "/topics/" + topic_name params = {} params["ShardCount"] = shard_count params["Lifecycle"] = lifecycle params["RecordType"] = record_type params["RecordSchema"] = JSON.generate(record_schema) params["Comment"] = comment return send_request("POST", path, params) end
delete_project(project_name)
click to toggle source
datahub目前未实现update操作 def update_project(project_name, comment)
path = "/projects/" + project_name params = {} params["Comment"] = comment return send_request("PUT", path, params)
end
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 127 def delete_project(project_name) path = "/projects/" + project_name return send_request("DELETE", path) end
delete_topic(project_name, topic_name)
click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 161 def delete_topic(project_name, topic_name) path = "/projects/" + project_name + "/topics/" + topic_name return send_request("DELETE", path) end
get_project(project_name)
click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 107 def get_project(project_name) path = "/projects/" + project_name return send_request("GET", path) end
get_shard_cursor(project_name, topic_name, shard_id, offset=DateTime.now.strftime('%Q'), type="LATEST")
click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 201 def get_shard_cursor(project_name, topic_name, shard_id, offset=DateTime.now.strftime('%Q'), type="LATEST") path = "/projects/" + project_name + "/topics/" + topic_name + "/shards/" + shard_id params = {} params["Action"] = "cursor" params["SystemTime"] = offset params["Type"] = type return send_request("POST", path, params) end
get_signature(params)
click to toggle source
Signature = base64(HmacSha1(AccessKeySecret, VERB + “n”
+ CONTENT-TYPE + "\n" + DATE + "\n" + CanonicalizedDatahubHeaders + "\n" + CanonicalizedResource))
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 31 def get_signature(params) verb = params["verb"] content_type = params["content-type"] resource = params["resource"] gmt_time = params["date"] data = verb + "\n" + content_type + "\n" + gmt_time + "\n" + "x-datahub-client-version:1\n" + resource # data = "GET\napplication/json\nFri, 06 May 2016 06:43:31 GMT\nx-datahub-client-version:1\n/projects/test_project/topics/datahub_fluentd_out_1" return "DATAHUB " + @access_id + ":" + Base64.encode64("#{OpenSSL::HMAC.digest('sha1', @access_key, data)}").chomp end
get_topic(project_name, topic_name)
click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 137 def get_topic(project_name, topic_name) path = "/projects/" + project_name + "/topics/" + topic_name return send_request("GET", path) end
list_projects()
click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 102 def list_projects() path = "/projects" return send_request("GET", path) end
list_shards(project_name, topic_name)
click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 166 def list_shards(project_name, topic_name) path = "/projects/" + project_name + "/topics/" + topic_name + "/shards" return send_request("GET", path) end
list_topics(project_name)
click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 132 def list_topics(project_name) path = "/projects/" + project_name + "/topics" return send_request("GET", path) end
read_data_from_shard(project_name, topic_name, shard_id, count, offset=DateTime.now.strftime('%Q'), type="LATEST")
click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 219 def read_data_from_shard(project_name, topic_name, shard_id, count, offset=DateTime.now.strftime('%Q'), type="LATEST") path = "/projects/" + project_name + "/topics/" + topic_name + "/shards/" + shard_id cursor = get_shard_cursor(project_name, topic_name, shard_id, offset, type).to_hash["Cursor"] params = {} params["Action"] = "sub" params["Cursor"] = cursor params["Limit"] = count return send_request("POST", path, params) end
read_data_from_shard_with_cursor(project_name, topic_name, shard_id, cursor, count)
click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 210 def read_data_from_shard_with_cursor(project_name, topic_name, shard_id, cursor, count) path = "/projects/" + project_name + "/topics/" + topic_name + "/shards/" + shard_id params = {} params["Action"] = "sub" params["Cursor"] = cursor params["Limit"] = count return send_request("POST", path, params) end
send_request(method, path, params={}, headers={})
click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 41 def send_request(method, path, params={}, headers={}) uri = URI(@endpoint + path) http_req = nil if method == "GET" http_req = Net::HTTP::Get.new(uri.path) elsif method == "POST" http_req = Net::HTTP::Post.new(uri.path) # start_time = Time.now.to_f # puts "start_time :" + start_time.to_s http_req.body = JSON.generate(params) # end_time = Time.now.to_f # puts "end_time :" + end_time.to_s # puts ((end_time - start_time)*1000).to_s elsif method == "PUT" http_req = Net::HTTP::Put.new(uri.path) http_req.body = JSON.generate(params) elsif method == "DELETE" http_req = Net::HTTP::Delete.new(uri.path) else raise "Unsupported method: " + method end sig_params = {} sig_params["verb"] = method sig_params["content-type"] = "application/json" sig_params["resource"] = path sig_params["date"] = Time.now.utc.strftime("%a, %d %b %Y %T") + " GMT" http_req["Authorization"] = get_signature(sig_params) http_req["x-datahub-client-version"] = 1 http_req["Date"] = sig_params["date"] http_req["Content-Type"] = sig_params["content-type"] http_req["ACCEPT-ENCODING"] = nil # puts http_req["Date"] # puts http_req["Authorization"] # puts http_req.to_hash headers.each do |k, v| http_req[k] = v end # puts path return Net::HTTP.start(uri.host, uri.port) { |client| http_resp = client.request(http_req) if http_resp.code != '200' and http_resp.code != '201' if http_resp.body.empty? raise "Send request failed, unknown response " end raise http_resp.body end result = http_resp.body if result.empty? return nil end return JSON.parse(result) } end
update_topic(project_name, topic_name, lifecycle, comment)
click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 153 def update_topic(project_name, topic_name, lifecycle, comment) path = "/projects/" + project_name + "/topics/" + topic_name params = {} params["Lifecycle"] = lifecycle params["Comment"] = comment return send_request("PUT", path, params) end
write_data_to_topic(project_name, topic_name, record_entities)
click to toggle source
def split_shard(project_name, topic_name, shard_id, split_key)
path = "/projects/" + project_name + "/topics/" + topic_name + "/shards" params = {} params["Action"] = "split" params["ShardId"] = shard_id params["SplitKey"] = split_key return send_request("POST", path, params)
end
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 193 def write_data_to_topic(project_name, topic_name, record_entities) path = "/projects/" + project_name + "/topics/" + topic_name + "/shards" params = {} params["Action"] = "pub" params["Records"] = record_entities return send_request("POST", path, params) end