class DatahubTopic
Attributes
comment[RW]
create_time[RW]
last_modify_time[RW]
lifecycle[RW]
record_schema[RW]
record_type[RW]
shard_count[RW]
Public Class Methods
new(datahub_http_client, project_name, topic_name)
click to toggle source
# File lib/fluent/plugin/datahub/datahub-topic.rb, line 15 def initialize(datahub_http_client, project_name, topic_name) @client = datahub_http_client @project_name = project_name @topic_name = topic_name end
Public Instance Methods
get_cursor(shard_id, offset=DateTime.now.strftime('%Q'), type="OLDEST")
click to toggle source
# File lib/fluent/plugin/datahub/datahub-topic.rb, line 45 def get_cursor(shard_id, offset=DateTime.now.strftime('%Q'), type="OLDEST") result_map = @client.get_shard_cursor(@project_name, @topic_name, shard_id, offset, type) return result_map["Cursor"] end
list_shards()
click to toggle source
# File lib/fluent/plugin/datahub/datahub-topic.rb, line 21 def list_shards() result_map = @client.list_shards(@project_name, @topic_name) shard_array = result_map["Shards"] shards = [] for i in 0...shard_array.size shard = DatahubShard.new shard_map = shard_array[i] shard.begin_key = shard_map["BeginKey"] shard.end_key = shard_map["EndKey"] shard.left_shard_id = shard_map["LeftShardId"] shard.parent_shard_ids = shard_map["ParentShardIds"] shard.right_shard_id = shard_map["RightShardId"] shard.shard_id = shard_map["ShardId"] shard.state = shard_map["State"] shards.push(shard) end return shards end
read_data(shard_id, cursor, count)
click to toggle source
# File lib/fluent/plugin/datahub/datahub-topic.rb, line 69 def read_data(shard_id, cursor, count) @client.read_data_from_shard_with_cursor(@project_name, @topic_name, shard_id, cursor, count) end
write_data(record_entities)
click to toggle source
# File lib/fluent/plugin/datahub/datahub-topic.rb, line 50 def write_data(record_entities) put_record_result = PutRecordResult.new result_map = @client.write_data_to_topic(@project_name, @topic_name, record_entities) if result_map["FailedRecordCount"] > 0 put_record_result.failed_record_count = result_map["FailedRecordCount"] for i in 0...result_map["FailedRecords"].size result_error = result_map["FailedRecords"][i] put_record_result.failed_record_index.push(result_error["Index"]) error_entity = {} error_entity["error_code"] = result_error["ErrorCode"] error_entity["error_message"] = result_error["ErrorMessage"] put_record_result.failed_record_error.push(error_entity) put_record_result.failed_record_list.push(record_entities[result_error["Index"]]) end end return put_record_result end