class Fluent::DatahubOutput
Attributes
shard_cursor[RW]
该值内部使用,不提供配置 分发shard的游标
Public Instance Methods
check_params()
click to toggle source
# File lib/fluent/plugin/out_datahub.rb, line 90 def check_params schema = @datahub_topic.record_schema if @data_encoding != nil schema.setEncoding(@data_encoding) end fields = schema.get_fields # 保证用户配置的字段在topic中存在 if @column_names.size > 0 for i in 0...@column_names.size do column_name = @column_names[i] column_index = find_column_index(fields, column_name) if column_index == -1 @logger.error "Column: " + column_name + " not found, please check your config" raise "Column: " + column_name + " not found, please check your config" end end end if @source_keys.size == 0 @source_keys = @column_names end #puts "source_key size: " + @source_keys.to_s #puts "column_names: " + @column_names.to_s if @source_keys.size > 0 and @column_names.size != @source_keys.size @logger.error "source_keys's size must be equal to column_names's size, please check your config" raise "source_keys's size must be equal to column_names's size, please check your config" else for i in 0...@column_names.size do @target_source_column_map[@column_names[i]] = @source_keys[i] end end #puts @target_source_column_map if @shard_count < 1 raise "there must be at least 1 active shard!" end # 配置了脏数据继续,必须指定脏数据文件 if @dirty_data_continue if @dirty_data_file.to_s.chomp.length == 0 raise "Dirty data file path can not be empty" end end # 检查shard_keys字段是否合法 if @shard_keys.size > 0 for i in 0...@shard_keys.size shard_key = @shard_keys[i] shard_key_index = find_column_index(fields, shard_key) if shard_key_index == -1 @logger.error "Shard key: " + shard_key + " not found in schema, please check your config" raise "Shard key: " + shard_key + " not found in schema, please check your config" end end end end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datahub.rb, line 69 def configure(conf) super @client = DatahubClient.new(@endpoint, @access_id, @access_key) @datahub_project = @client.get_project(@project_name) @datahub_topic = @datahub_project.get_topic(@topic_name) @shards = get_active_shard @shard_count = @shards.size @logger = log @shard_cursor = 0 #限制一次向datahub put data不能超过3000 @put_data_max_size = 3000 @target_source_column_map = {} # 前置校验参数 check_params end
find_column_index(fields, column_name)
click to toggle source
在topic的schema中查找某列的真实下标 如果没找到返回-1
# File lib/fluent/plugin/out_datahub.rb, line 155 def find_column_index(fields, column_name) for i in 0...fields.size do name = fields[i].get_name if name == column_name return i end end return -1 end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_datahub.rb, line 173 def format(tag, time, record) [tag, time, record].to_json + '\n' end
get_active_shard()
click to toggle source
获取active状态的shard
# File lib/fluent/plugin/out_datahub.rb, line 389 def get_active_shard all_shards = @datahub_topic.list_shards active_shards = [] all_shards.each do |shard| if shard.state == "ACTIVE" active_shards.push(shard) end end return active_shards end
get_shard_id(record)
click to toggle source
产生写入的shard_id
# File lib/fluent/plugin/out_datahub.rb, line 354 def get_shard_id(record) if @shard_id != nil and !@shard_id.empty? return @shard_id elsif @shard_keys != nil and @shard_keys.size > 0 #hash 写入 hash_string = "" for i in 0...@shard_keys.size shard_key = @shard_keys[i] source_key = @target_source_column_map[shard_key] if record[source_key] != nil hash_string += record[source_key].to_s + "," end end hashed_value = hash_code(hash_string) index = hashed_value % @shard_count return @shards[index].shard_id else #轮询写入 idx = @shard_cursor % @shard_count @shard_cursor = idx + 1 shard_id = @shards[idx].shard_id # puts "idx: " + idx.to_s # puts "shard_id: " + shard_id.to_s return shard_id end end
hash_code(str)
click to toggle source
产生和java 一样的hashcode
# File lib/fluent/plugin/out_datahub.rb, line 382 def hash_code(str) str.each_char.reduce(0) do |result, char| [((result << 5) - result) + char.ord].pack('L').unpack('l').first end end
record_to_entity(entity, record)
click to toggle source
将record转化为entity
# File lib/fluent/plugin/out_datahub.rb, line 292 def record_to_entity(entity, record) schema = entity.get_schema @column_names.each do |column| begin source_key = @target_source_column_map[column] if record.has_key?(source_key) field = schema.get_field(column) if field == nil raise "Unknown column name of data" else field_type = field.get_type if field_type == "BIGINT" entity.setBigInt(column, record[source_key]) elsif field_type == "DOUBLE" entity.setDouble(column, record[source_key]) elsif field_type == "BOOLEAN" entity.setBoolean(column, record[source_key]) elsif field_type == "STRING" entity.setString(column, record[source_key]) elsif field_type == "TIMESTAMP" entity.setTimeStamp(column, record[source_key]) else raise "Unknown schema type of data" end end end rescue => e @logger.error "Parse data: " + column + "[" + record[source_key].to_s + "] failed, " + e.message if !@dirty_data_continue @logger.error "Dirty data found, exit process now." puts "Dirty data found, exit process now." raise "try to exit!" else # 忽略的异常数据直接落文件 write_as_dirty_data(record) end return false end end return true end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datahub.rb, line 169 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datahub.rb, line 165 def start super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_datahub.rb, line 181 def write(chunk) record_entities = [] schema = @datahub_topic.record_schema chunk.msgpack_each do |tag, time, record| entity = RecordEntity.new(schema) convert_success = record_to_entity(entity, record) entity.set_shard_id(get_shard_id(record)) if convert_success record_entities.push(entity) end if record_entities.size >= @put_data_max_size write_data_with_retry(record_entities) # puts record_entities.to_json record_entities.clear # puts "after clear ; " + record_entities.to_json elsif record_entities.size >= @put_data_batch_size write_data_with_retry(record_entities) #puts record_entities.to_json record_entities.clear #puts "after clear ; " + record_entities.to_json end end if record_entities.size > 0 write_data_with_retry(record_entities) # record_entities.clear end end
write_as_dirty_data(record)
click to toggle source
脏数据文件处理
# File lib/fluent/plugin/out_datahub.rb, line 336 def write_as_dirty_data(record) dirty_file_part1_name = @dirty_data_file + ".part1" dirty_file_part2_name = @dirty_data_file + ".part2" # todo 加锁写入 @@file_lock.synchronize { dirty_file_part2 = File.open(dirty_file_part2_name, "a+") dirty_file_part2.puts(record.to_json) dirty_file_part2.close if File.size(dirty_file_part2_name) > @dirty_data_file_max_size / 2 # .part1, .part2分别存储数据 # 旧数据落part1,新的数据落part2 FileUtils.mv(dirty_file_part2_name, dirty_file_part1_name) end } end
write_data_with_retry(record_entities)
click to toggle source
根据@@retry_times 重试写入datahub数据
# File lib/fluent/plugin/out_datahub.rb, line 212 def write_data_with_retry(record_entities) tmp_retry_times = @retry_times put_result = nil while true begin put_result = @datahub_topic.write_data(record_entities) rescue => e @logger.warn "Put " + record_entities.size.to_s + " records to datahub failed, total " + record_entities.size.to_s + ", message = " + e.message if tmp_retry_times > 0 sleep @retry_interval @logger.warn "Now retry(" + (@retry_times - tmp_retry_times + 1).to_s + ")..." tmp_retry_times -= 1 next else if !@dirty_data_continue @logger.error "Dirty data found, exit process now." puts "Dirty data found, exit process now." raise "try to exit!" else #不重试/重试次数用完,写入脏数据文件 for i in 0...record_entities.size record_entity = record_entities[i] @logger.error "Put record: " + record_entity.get_columns_map.to_s + " failed, " + put_result.failed_record_error[i].to_s write_as_dirty_data(record_entity.get_columns_map) end break end end end #puts record_entities.to_json if put_result != nil and put_result.failed_record_count > 0 if tmp_retry_times > 0 #按照retry_times重试 @logger.warn "Put " + put_result.failed_record_count.to_s + " records to datahub failed, total " + record_entities.size.to_s sleep @retry_interval @logger.warn "Now retry(" + (@retry_times - tmp_retry_times + 1).to_s + ")..." tmp_retry_times -= 1 record_entities = put_result.failed_record_list # 若是轮询写入方式,且shard处于非active状态(即error_code = "InvalidShardOperation"),则刷新shard列表 fresh_shard_flag = false if @shard_id.empty? and @shard_keys.size == 0 for i in 0...put_result.failed_record_count error_entity = put_result.failed_record_error[i] if error_entity["error_code"] == "InvalidShardOperation" unless fresh_shard_flag @shards = get_active_shard @shard_count = @shards.size fresh_shard_flag = true end # puts "before: " + record_entities[i].to_json record_entities[i].set_shard_id(get_shard_id(record_entities[i])) # puts record_entities[i].to_json end end end else if !@dirty_data_continue @logger.error "Dirty data found, exit process now." puts "Dirty data found, exit process now." raise "try to exit!" else #不重试/重试次数用完,写入脏数据文件 for i in 0...put_result.failed_record_count record_entity = put_result.failed_record_list[i] @logger.error "Put record: " + record_entity.get_columns_map.to_s + " failed, " + put_result.failed_record_error[i].to_s write_as_dirty_data(record_entity.get_columns_map) end break end end else @logger.info "Put data to datahub success, total " + record_entities.size.to_s break end end end