class Fluent::DatahubOutput

Attributes

shard_cursor[RW]

该值内部使用,不提供配置 分发shard的游标

Public Instance Methods

check_params() click to toggle source
# File lib/fluent/plugin/out_datahub.rb, line 90
def check_params
  schema = @datahub_topic.record_schema
  if @data_encoding != nil
      schema.setEncoding(@data_encoding)
  end

  fields = schema.get_fields
  
  # 保证用户配置的字段在topic中存在
  if @column_names.size > 0
      for i in 0...@column_names.size do
          column_name = @column_names[i]
          column_index = find_column_index(fields, column_name)
          if column_index == -1
              @logger.error "Column: " + column_name + " not found, please check your config"
              raise "Column: " + column_name + " not found, please check your config"
          end
      end
  end

  if @source_keys.size == 0
      @source_keys = @column_names
  end

  #puts "source_key size: " + @source_keys.to_s
  #puts "column_names: " + @column_names.to_s

  if @source_keys.size > 0 and @column_names.size != @source_keys.size
      @logger.error "source_keys's size must be equal to column_names's size, please check your config"
      raise "source_keys's size must be equal to column_names's size, please check your config"
  else
      for i in 0...@column_names.size do
          @target_source_column_map[@column_names[i]] = @source_keys[i]
      end
  end

  #puts @target_source_column_map

  if @shard_count < 1
      raise "there must be at least 1 active shard!"
  end
  
  # 配置了脏数据继续,必须指定脏数据文件
  if @dirty_data_continue
      if @dirty_data_file.to_s.chomp.length == 0
          raise "Dirty data file path can not be empty"
      end
  end

  # 检查shard_keys字段是否合法
  if @shard_keys.size > 0
      for i in 0...@shard_keys.size
          shard_key = @shard_keys[i]
          shard_key_index = find_column_index(fields, shard_key)
          if shard_key_index == -1
              @logger.error "Shard key: " + shard_key + " not found in schema, please check your config"
              raise "Shard key: " + shard_key + " not found in schema, please check your config"
          end
      end
  end
    
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datahub.rb, line 69
def configure(conf)
    super
    @client = DatahubClient.new(@endpoint, @access_id, @access_key)
    @datahub_project = @client.get_project(@project_name)
    @datahub_topic = @datahub_project.get_topic(@topic_name)

    @shards = get_active_shard
    @shard_count = @shards.size

    @logger = log
    @shard_cursor = 0

    #限制一次向datahub put data不能超过3000
    @put_data_max_size = 3000

    @target_source_column_map = {}
    
    # 前置校验参数
    check_params
end
find_column_index(fields, column_name) click to toggle source

在topic的schema中查找某列的真实下标 如果没找到返回-1

# File lib/fluent/plugin/out_datahub.rb, line 155
def find_column_index(fields, column_name)
    for i in 0...fields.size do
        name = fields[i].get_name
        if name == column_name
            return i
        end
    end
    return -1
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_datahub.rb, line 173
def format(tag, time, record)
    [tag, time, record].to_json + '\n' 
end
get_active_shard() click to toggle source

获取active状态的shard

# File lib/fluent/plugin/out_datahub.rb, line 389
def get_active_shard
    all_shards = @datahub_topic.list_shards
    active_shards = []
    all_shards.each do |shard|
        if shard.state == "ACTIVE"
            active_shards.push(shard)
        end
    end

    return active_shards
end
get_shard_id(record) click to toggle source

产生写入的shard_id

# File lib/fluent/plugin/out_datahub.rb, line 354
def get_shard_id(record)
    if @shard_id != nil and !@shard_id.empty?
        return @shard_id
    elsif @shard_keys != nil and @shard_keys.size > 0
        #hash 写入
        hash_string = ""
        for i in 0...@shard_keys.size
            shard_key = @shard_keys[i]
            source_key = @target_source_column_map[shard_key]
            if record[source_key] != nil
                hash_string += record[source_key].to_s + ","
            end
        end
        hashed_value = hash_code(hash_string)
        index = hashed_value % @shard_count
        return @shards[index].shard_id
    else
        #轮询写入
        idx = @shard_cursor % @shard_count
        @shard_cursor = idx + 1
        shard_id = @shards[idx].shard_id
        # puts "idx: " + idx.to_s
        # puts "shard_id: " + shard_id.to_s
        return shard_id
    end
end
hash_code(str) click to toggle source

产生和java 一样的hashcode

# File lib/fluent/plugin/out_datahub.rb, line 382
def hash_code(str)
    str.each_char.reduce(0) do |result, char|
        [((result << 5) - result) + char.ord].pack('L').unpack('l').first
    end
end
record_to_entity(entity, record) click to toggle source

将record转化为entity

# File lib/fluent/plugin/out_datahub.rb, line 292
def record_to_entity(entity, record)
    schema = entity.get_schema
    @column_names.each do |column|
        begin
            source_key = @target_source_column_map[column]
            if record.has_key?(source_key)
                field = schema.get_field(column)
                if field == nil
                    raise "Unknown column name of data"
                else
                    field_type = field.get_type
                    if field_type == "BIGINT"
                        entity.setBigInt(column, record[source_key])
                    elsif field_type == "DOUBLE"
                        entity.setDouble(column, record[source_key])
                    elsif field_type == "BOOLEAN"
                        entity.setBoolean(column, record[source_key])
                    elsif field_type == "STRING"
                        entity.setString(column, record[source_key])
                    elsif field_type == "TIMESTAMP"
                        entity.setTimeStamp(column, record[source_key])
                    else
                        raise "Unknown schema type of data"
                    end
                end
            end
        rescue => e
            @logger.error "Parse data: " + column + "[" + record[source_key].to_s + "] failed, " + e.message
            if !@dirty_data_continue
                @logger.error "Dirty data found, exit process now."
                puts "Dirty data found, exit process now."
                raise "try to exit!"
            else
                # 忽略的异常数据直接落文件
                write_as_dirty_data(record)
            end
            return false
        end
    end
    return true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datahub.rb, line 169
def shutdown
    super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_datahub.rb, line 165
def start
    super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_datahub.rb, line 181
def write(chunk)
    record_entities = []
    schema = @datahub_topic.record_schema

    chunk.msgpack_each do |tag, time, record|
        entity = RecordEntity.new(schema)
        convert_success = record_to_entity(entity, record)
        entity.set_shard_id(get_shard_id(record))
        if convert_success
            record_entities.push(entity)
        end
        if record_entities.size >= @put_data_max_size
            write_data_with_retry(record_entities)
            # puts record_entities.to_json
            record_entities.clear
            # puts "after clear ; " + record_entities.to_json
        elsif record_entities.size >= @put_data_batch_size
            write_data_with_retry(record_entities)
            #puts record_entities.to_json
            record_entities.clear
            #puts "after clear ; " + record_entities.to_json
        end
    end

    if record_entities.size > 0
        write_data_with_retry(record_entities)
        # record_entities.clear
    end
end
write_as_dirty_data(record) click to toggle source

脏数据文件处理

# File lib/fluent/plugin/out_datahub.rb, line 336
def write_as_dirty_data(record)
    dirty_file_part1_name = @dirty_data_file + ".part1"
    dirty_file_part2_name = @dirty_data_file + ".part2"

    # todo 加锁写入
    @@file_lock.synchronize {
        dirty_file_part2 = File.open(dirty_file_part2_name, "a+")
        dirty_file_part2.puts(record.to_json)
        dirty_file_part2.close
        if File.size(dirty_file_part2_name) > @dirty_data_file_max_size / 2
            # .part1, .part2分别存储数据
            # 旧数据落part1,新的数据落part2
            FileUtils.mv(dirty_file_part2_name, dirty_file_part1_name)
        end
    }
end
write_data_with_retry(record_entities) click to toggle source

根据@@retry_times 重试写入datahub数据

# File lib/fluent/plugin/out_datahub.rb, line 212
def write_data_with_retry(record_entities)
    tmp_retry_times = @retry_times
    put_result = nil
    while true
        begin
            put_result = @datahub_topic.write_data(record_entities)
        rescue => e
            @logger.warn "Put " + record_entities.size.to_s + " records to datahub failed, total " + record_entities.size.to_s + ", message = " + e.message
            if tmp_retry_times > 0
                sleep @retry_interval
                @logger.warn "Now retry(" + (@retry_times - tmp_retry_times + 1).to_s + ")..."
                tmp_retry_times -= 1
                next
            else
                if !@dirty_data_continue
                    @logger.error "Dirty data found, exit process now."
                    puts "Dirty data found, exit process now."
                    raise "try to exit!"
                else
                    #不重试/重试次数用完,写入脏数据文件
                    for i in 0...record_entities.size
                        record_entity = record_entities[i]
                        @logger.error "Put record: " + record_entity.get_columns_map.to_s + " failed, " + put_result.failed_record_error[i].to_s
                        write_as_dirty_data(record_entity.get_columns_map)
                    end
                    break
                end
            end
        end

        #puts record_entities.to_json
        if put_result != nil and put_result.failed_record_count > 0
            if tmp_retry_times > 0
                #按照retry_times重试
                @logger.warn "Put " + put_result.failed_record_count.to_s + " records to datahub failed, total " + record_entities.size.to_s
                sleep @retry_interval
                @logger.warn "Now retry(" + (@retry_times - tmp_retry_times + 1).to_s + ")..."
                tmp_retry_times -= 1
                record_entities = put_result.failed_record_list

                # 若是轮询写入方式,且shard处于非active状态(即error_code = "InvalidShardOperation"),则刷新shard列表
                fresh_shard_flag = false
                if @shard_id.empty? and @shard_keys.size == 0
                    for i in 0...put_result.failed_record_count
                        error_entity = put_result.failed_record_error[i]
                        if error_entity["error_code"] == "InvalidShardOperation"
                            unless fresh_shard_flag
                                @shards = get_active_shard
                                @shard_count = @shards.size
                                fresh_shard_flag = true
                            end
                            # puts "before: " + record_entities[i].to_json
                            record_entities[i].set_shard_id(get_shard_id(record_entities[i]))
                            # puts record_entities[i].to_json
                        end
                    end
                end
            else
                if !@dirty_data_continue
                    @logger.error "Dirty data found, exit process now."
                    puts "Dirty data found, exit process now."
                    raise "try to exit!"
                else
                    #不重试/重试次数用完,写入脏数据文件
                    for i in 0...put_result.failed_record_count
                        record_entity = put_result.failed_record_list[i]
                        @logger.error "Put record: " + record_entity.get_columns_map.to_s + " failed, " + put_result.failed_record_error[i].to_s
                        write_as_dirty_data(record_entity.get_columns_map)
                    end
                    break
                end
            end
        else
            @logger.info "Put data to datahub success, total " + record_entities.size.to_s
            break
        end
    end
end