class DatahubTopic

Attributes

comment[RW]
create_time[RW]
last_modify_time[RW]
lifecycle[RW]
record_schema[RW]
record_type[RW]
shard_count[RW]

Public Class Methods

new(datahub_http_client, project_name, topic_name) click to toggle source
# File lib/fluent/plugin/datahub/datahub-topic.rb, line 15
def initialize(datahub_http_client, project_name, topic_name)
    @client = datahub_http_client
    @project_name = project_name
    @topic_name = topic_name
end

Public Instance Methods

get_cursor(shard_id, offset=DateTime.now.strftime('%Q'), type="OLDEST") click to toggle source
# File lib/fluent/plugin/datahub/datahub-topic.rb, line 45
def get_cursor(shard_id, offset=DateTime.now.strftime('%Q'), type="OLDEST")
    result_map = @client.get_shard_cursor(@project_name, @topic_name, shard_id, offset, type)
    return result_map["Cursor"]
end
list_shards() click to toggle source
# File lib/fluent/plugin/datahub/datahub-topic.rb, line 21
def list_shards()
    result_map = @client.list_shards(@project_name, @topic_name)
    shard_array = result_map["Shards"]
    
    shards = []
    
    for i in 0...shard_array.size
        shard = DatahubShard.new

        shard_map = shard_array[i]
        shard.begin_key = shard_map["BeginKey"]
        shard.end_key = shard_map["EndKey"]
        shard.left_shard_id = shard_map["LeftShardId"]
        shard.parent_shard_ids = shard_map["ParentShardIds"]
        shard.right_shard_id = shard_map["RightShardId"]
        shard.shard_id = shard_map["ShardId"]
        shard.state = shard_map["State"]
        
        shards.push(shard)
    end
    
    return shards
end
read_data(shard_id, cursor, count) click to toggle source
# File lib/fluent/plugin/datahub/datahub-topic.rb, line 69
def read_data(shard_id, cursor, count)
    @client.read_data_from_shard_with_cursor(@project_name, @topic_name, shard_id, cursor, count)
end
write_data(record_entities) click to toggle source
# File lib/fluent/plugin/datahub/datahub-topic.rb, line 50
def write_data(record_entities)
    put_record_result = PutRecordResult.new
    result_map = @client.write_data_to_topic(@project_name, @topic_name, record_entities)

    if result_map["FailedRecordCount"] > 0
        put_record_result.failed_record_count = result_map["FailedRecordCount"]
        for i in 0...result_map["FailedRecords"].size
            result_error = result_map["FailedRecords"][i]
            put_record_result.failed_record_index.push(result_error["Index"])
            error_entity = {}
            error_entity["error_code"] = result_error["ErrorCode"]
            error_entity["error_message"] = result_error["ErrorMessage"]
            put_record_result.failed_record_error.push(error_entity)
            put_record_result.failed_record_list.push(record_entities[result_error["Index"]])
        end
    end
    return put_record_result
end