class DatahubHttpClient

@author pholy.ht ## @time 2016-03-24 ##

Attributes

access_id[RW]
access_key[RW]
endpoint[RW]

Public Class Methods

new(endpoint, access_id, access_key) click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 19
def initialize(endpoint, access_id, access_key)
    @endpoint = endpoint
    @access_id = access_id
    @access_key = access_key
end

Public Instance Methods

create_project(project_name, comment) click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 112
def create_project(project_name, comment)
    path = "/projects/" + project_name
    params = {}
    params["Comment"] = comment
    return send_request("POST", path, params)
end
create_topic(project_name, topic_name, shard_count, lifecycle, record_type, record_schema, comment) click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 142
def create_topic(project_name, topic_name, shard_count, lifecycle, record_type, record_schema, comment)
    path = "/projects/" + project_name + "/topics/" + topic_name
    params = {}
    params["ShardCount"] = shard_count
    params["Lifecycle"] = lifecycle
    params["RecordType"] = record_type
    params["RecordSchema"] = JSON.generate(record_schema)
    params["Comment"] = comment
    return send_request("POST", path, params)
end
delete_project(project_name) click to toggle source

datahub目前未实现update操作 def update_project(project_name, comment)

path = "/projects/" + project_name
params = {}
params["Comment"] = comment
return send_request("PUT", path, params)

end

# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 127
def delete_project(project_name)
    path = "/projects/" + project_name
    return send_request("DELETE", path)
end
delete_topic(project_name, topic_name) click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 161
def delete_topic(project_name, topic_name)
    path = "/projects/" + project_name + "/topics/" + topic_name
    return send_request("DELETE", path)
end
get_project(project_name) click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 107
def get_project(project_name)
    path = "/projects/" + project_name
    return send_request("GET", path)
end
get_shard_cursor(project_name, topic_name, shard_id, offset=DateTime.now.strftime('%Q'), type="LATEST") click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 201
def get_shard_cursor(project_name, topic_name, shard_id, offset=DateTime.now.strftime('%Q'), type="LATEST")
    path = "/projects/" + project_name + "/topics/" + topic_name + "/shards/" + shard_id
    params = {}
    params["Action"] = "cursor"
    params["SystemTime"] = offset
    params["Type"] = type
    return send_request("POST", path, params)
end
get_signature(params) click to toggle source

Signature = base64(HmacSha1(AccessKeySecret, VERB + “n”

+ CONTENT-TYPE + "\n"
+ DATE + "\n"
+ CanonicalizedDatahubHeaders + "\n"
+ CanonicalizedResource))
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 31
def get_signature(params)
    verb = params["verb"]
    content_type = params["content-type"]
    resource = params["resource"]
    gmt_time = params["date"]
    data = verb + "\n" + content_type + "\n" + gmt_time + "\n" + "x-datahub-client-version:1\n" + resource
    # data = "GET\napplication/json\nFri, 06 May 2016 06:43:31 GMT\nx-datahub-client-version:1\n/projects/test_project/topics/datahub_fluentd_out_1"
    return "DATAHUB " + @access_id + ":" + Base64.encode64("#{OpenSSL::HMAC.digest('sha1', @access_key, data)}").chomp
end
get_topic(project_name, topic_name) click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 137
def get_topic(project_name, topic_name)
    path = "/projects/" + project_name + "/topics/" + topic_name
    return send_request("GET", path)
end
list_projects() click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 102
def list_projects()
    path = "/projects"
    return send_request("GET", path)
end
list_shards(project_name, topic_name) click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 166
def list_shards(project_name, topic_name)
    path = "/projects/" + project_name + "/topics/" + topic_name + "/shards"
    return send_request("GET", path)
end
list_topics(project_name) click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 132
def list_topics(project_name)
    path = "/projects/" + project_name + "/topics"
    return send_request("GET", path)
end
read_data_from_shard(project_name, topic_name, shard_id, count, offset=DateTime.now.strftime('%Q'), type="LATEST") click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 219
def read_data_from_shard(project_name, topic_name, shard_id, count, offset=DateTime.now.strftime('%Q'), type="LATEST")
    path = "/projects/" + project_name + "/topics/" + topic_name + "/shards/" + shard_id
    cursor = get_shard_cursor(project_name, topic_name, shard_id, offset, type).to_hash["Cursor"]
    params = {}
    params["Action"] = "sub"
    params["Cursor"] = cursor
    params["Limit"] = count
    return send_request("POST", path, params)
end
read_data_from_shard_with_cursor(project_name, topic_name, shard_id, cursor, count) click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 210
def read_data_from_shard_with_cursor(project_name, topic_name, shard_id, cursor, count)
    path = "/projects/" + project_name + "/topics/" + topic_name + "/shards/" + shard_id
    params = {}
    params["Action"] = "sub"
    params["Cursor"] = cursor
    params["Limit"] = count
    return send_request("POST", path, params)
end
send_request(method, path, params={}, headers={}) click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 41
def send_request(method, path, params={}, headers={})
    uri = URI(@endpoint + path)
    http_req = nil
    if method == "GET"
        http_req = Net::HTTP::Get.new(uri.path)
    elsif method == "POST"
        http_req = Net::HTTP::Post.new(uri.path)
        # start_time = Time.now.to_f
        # puts "start_time :" + start_time.to_s
        http_req.body = JSON.generate(params)
        # end_time = Time.now.to_f
        # puts "end_time   :" + end_time.to_s
        # puts ((end_time - start_time)*1000).to_s
    elsif method == "PUT"
        http_req = Net::HTTP::Put.new(uri.path)
        http_req.body = JSON.generate(params)
    elsif method == "DELETE"
        http_req = Net::HTTP::Delete.new(uri.path)
    else
        raise "Unsupported method: " + method
    end
    
    sig_params = {}
    sig_params["verb"] = method
    sig_params["content-type"] = "application/json" 
    sig_params["resource"] = path
    sig_params["date"] = Time.now.utc.strftime("%a, %d %b %Y %T") + " GMT"
    
    http_req["Authorization"] = get_signature(sig_params)
    http_req["x-datahub-client-version"] = 1
    http_req["Date"] = sig_params["date"]
    http_req["Content-Type"] = sig_params["content-type"]
    http_req["ACCEPT-ENCODING"] = nil
    
    # puts http_req["Date"]
    # puts http_req["Authorization"]
    # puts http_req.to_hash
    
    headers.each do |k, v|
        http_req[k] = v
    end
    
    # puts path
    
    return Net::HTTP.start(uri.host, uri.port) {
        |client|
        http_resp = client.request(http_req)
        if http_resp.code != '200' and http_resp.code != '201'
            if http_resp.body.empty?
                raise "Send request failed, unknown response " 
            end
            raise http_resp.body
        end
        result = http_resp.body
        if result.empty?
            return nil
        end
        return JSON.parse(result)
    }
end
update_topic(project_name, topic_name, lifecycle, comment) click to toggle source
# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 153
def update_topic(project_name, topic_name, lifecycle, comment)
    path = "/projects/" + project_name + "/topics/" + topic_name
    params = {}
    params["Lifecycle"] = lifecycle
    params["Comment"] = comment
    return send_request("PUT", path, params)
end
write_data_to_topic(project_name, topic_name, record_entities) click to toggle source

def split_shard(project_name, topic_name, shard_id, split_key)

path = "/projects/" + project_name + "/topics/" + topic_name + "/shards"
params = {}
params["Action"] = "split"
params["ShardId"] = shard_id
params["SplitKey"] = split_key
return send_request("POST", path, params)

end

# File lib/fluent/plugin/datahub/datahub-http-client.rb, line 193
def write_data_to_topic(project_name, topic_name, record_entities)
    path = "/projects/" + project_name + "/topics/" + topic_name + "/shards"
    params = {}
    params["Action"] = "pub"
    params["Records"] = record_entities
    return send_request("POST", path, params)
end