class LogStash::Inputs::Datahub

Datahub output plugin

Constants

DatahubPackage

Public Instance Methods

check_params() click to toggle source
# File lib/logstash/inputs/datahub.rb, line 116
def check_params()
  @logger.info "Config shardIds:" + @shard_ids.to_s
  if @shard_ids.size != 0
    valid = true
    for shard in 0...@shard_ids.size
      @logger.info "Checking shard:" + @shard_ids[shard]
      shard_exist_active = false
      for i in 0...@shards.size
        shard_entry = @shards[i]
        if shard_entry.getShardId() == @shard_ids[shard] && shard_entry.getState() == DatahubPackage.model.ShardState::ACTIVE
          shard_exist_active = true
          break
        end
      end
      if !shard_exist_active
        valid = false
      end
    end
    if (!valid)
      @logger.error "Config shard_id not exists or state not active, check your config"
      raise "Config shard_id not exists or state not active, check your config"
    end
  else
    valid = false
    for i in 0...@shards.size
      shard_entry = @shards[i]
      @logger.info "Checking shard:" + shard_entry.getShardId()
      if shard_entry.getState() == DatahubPackage.model.ShardState::ACTIVE
        @shard_ids.push(shard_entry.getShardId())
        valid = true
      end
    end
    if (!valid)
      @logger.error "There is no active shard."
      raise "There is no active shard."
    end
  end
  @logger.info "Reading from shards:" + @shard_ids.to_s
  begin
    checkpoint_file = File.open(@pos_file, "a+")
    checkpoint_file.close
  rescue
    @logger.error "Config pos file is invalid, pos_file must point to a file."
    raise "Config pos file is invalid, pos_file must point to a file."
  end
end
check_stop() click to toggle source
# File lib/logstash/inputs/datahub.rb, line 289
def check_stop()
  @@mutex.synchronize {
    return @stop
  }
end
get_active_shards(shards) click to toggle source
# File lib/logstash/inputs/datahub.rb, line 326
def get_active_shards(shards)
  active_shards = []
  for i in 0...shards.size
    entry = shards.get(i)
    if entry.getState() == DatahubPackage.model.ShardState::ACTIVE
      active_shards.push(entry)
    end
  end
  return active_shards
end
get_cursor(shard_id, force = false) click to toggle source
# File lib/logstash/inputs/datahub.rb, line 201
def get_cursor(shard_id, force = false)
  @@mutex.synchronize {
    if (force || !@cursor.has_key?(shard_id) || @cursor[shard_id]==nil || @cursor[shard_id]=="")
      @logger.info "Shard:" + shard_id + " has no checkpoint, will seek to begin."
      if (@cursor_time.has_key?(shard_id))
              if (@cursor_time[shard_id] == '-1')
                  cursorRs = @client.getCursor(@project_name, @topic_name, shard_id, DatahubPackage.model.GetCursorRequest::CursorType::LATEST)
              else
                  cursorRs = @client.getCursor(@project_name, @topic_name, shard_id, @cursor_time[shard_id].to_i * 1000)
              end
      else
          cursorRs = @client.getCursor(@project_name, @topic_name, shard_id, DatahubPackage.model.GetCursorRequest::CursorType::OLDEST)
      end    
      @cursor[shard_id] = cursorRs.getCursor()
      @logger.info "Start reading shard:" + shard_id + " with cursor:" + @cursor[shard_id]
    end
    return @cursor[shard_id]
  }
end
read_checkpoint() click to toggle source
# File lib/logstash/inputs/datahub.rb, line 174
def read_checkpoint()
  begin
    @logger.info "read checkpoint:" + @pos_file
    @@mutex.synchronize {
      File.foreach(@pos_file) do |line|
        checkpoint = line.chomp
        cursor_param = checkpoint.split(":")
        if cursor_param.size != 2
          raise "Invalid checkpoint:" + checkpoint
        end
        @cursor[cursor_param[0]] = cursor_param[1]
      end
      @logger.info "recover checkpoint:" + @cursor.to_s
    }
  rescue => e
    @logger.error e.backtrace.inspect.to_s
    @logger.error "No checkpoint found or invalid, will start normally."
    raise e
  end
end
read_record(shard_id, queue) click to toggle source
# File lib/logstash/inputs/datahub.rb, line 221
def read_record(shard_id, queue)
  cursor = get_cursor(shard_id)
  begin
    @logger.info "Get record at shard: " + shard_id + " cursor:" + cursor
    recordRs = @client.getRecords(@project_name, @topic_name, shard_id, cursor, @batch_limit, @schema)
    recordEntries = recordRs.getRecords()
    recordEntries.each do |record|
      data = Hash.new
      @fields.each do |field|
        case field.getType()
          when DatahubPackage.common.data.FieldType::BIGINT
            data[field.getName()] = record.getBigint(field.getName())
          when DatahubPackage.common.data.FieldType::DOUBLE
            data[field.getName()] = record.getDouble(field.getName())
          when DatahubPackage.common.data.FieldType::BOOLEAN
            data[field.getName()] = record.getBoolean(field.getName())
          when DatahubPackage.common.data.FieldType::TIMESTAMP
            data[field.getName()] = record.getTimeStamp(field.getName())
          when DatahubPackage.common.data.FieldType::STRING
            data[field.getName()] = record.getString(field.getName())
          else
            @logger.error "Unknow type " + field.getType().toString()
            raise "Unknow type " + field.getType().toString()
        end
      end
      data["timestamp"] = record.getSystemTime()
      event = LogStash::Event.new("message"=>data)
      decorate(event)
      queue << event
    end
    cursor = recordRs.getNextCursor()
    update_cursor(shard_id, cursor)
    @logger.info "Shard:" + shard_id + " Next cursor:" + cursor + " GetRecords:" + (recordRs.getRecordCount()).to_s
    if (recordRs.getRecordCount() == 0)
      @logger.info "Read to end, waiting for data:" + cursor
      Stud.stoppable_sleep(@interval) { stop? }
    else
      update_checkpoint
    end
  rescue DatahubPackage.exception.InvalidCursorException => e
    @logger.error "Shard:" + shard_id + " Invalid cursor:" + cursor + ", seek to begin."
    get_cursor(shard_id, true)
  rescue DatahubPackage.exception.InvalidOperationException => e
    @logger.error "Shard:" + shard_id + " Invalid operation, cursor:" + cursor + ", shard is sealed, consumer will exit."
    raise e
  rescue DatahubPackage.exception.DatahubClientException => e
    @logger.error "Read failed:" + e.getMessage() + ", will retry later."
    Stud.stoppable_sleep(@retry_interval) { stop? }
  end
end
register() click to toggle source
# File lib/logstash/inputs/datahub.rb, line 82
def register
  begin
    @stop = false
    @account = DatahubPackage.auth.AliyunAccount::new(@access_id, @access_key)
    @conf = DatahubPackage.DatahubConfiguration::new(@account, @endpoint)

    @client = DatahubPackage.DatahubClient::new(@conf)
    @project = DatahubPackage.wrapper.Project::Builder.build(@project_name, @client)
    @topic = @project.getTopic(@topic_name)

    @shards = get_active_shards(@topic.listShard())
    @shard_count = @shards.size()

    result = @client.getTopic(@project_name, @topic_name)
    @schema = result.getRecordSchema()
    @fields = @schema.getFields()

    # 前置校验参数
    check_params()
    # 读取checkpoint
    read_checkpoint()

    if @shard_count == 0
      @logger.error "No active shard available, please check"
      raise "No active shard available, please check"
    end

    @logger.info "Init datahub success!"
  rescue => e
    @logger.error "Init failed!"  + e.message + " " + e.backtrace.inspect.to_s
    raise e
  end
end
run(queue) click to toggle source
# File lib/logstash/inputs/datahub.rb, line 295
def run(queue)
  if @multi_thread
    for i in 0...@shard_ids.size
      @@consumer_threads << Thread.new(@shard_ids[i],queue) {|shard_id,queue|
        shard_consumer(shard_id, queue)
      }
    end
    while !check_stop do
      Stud.stoppable_sleep(15) { stop? }
      @logger.debug "Main thread heartbeat."
    end # loop
  else
    while !check_stop do
      for i in 0...@shard_ids.size
        begin
          read_record(@shard_ids[i], queue)
        rescue DatahubPackage.exception.InvalidOperationException => e
          @logger.error "Shard:" + @shard_ids[i] + " Invalid operation caused by sealed shard, remove this shard."
          @shard_ids.delete_at(i)
          break
        rescue => e
          @logger.error "Read records failed" + e.backtrace.inspect.to_s
          @logger.error "Will retry after " + @retry_interval.to_s + " seconds."
          Stud.stoppable_sleep(@retry_interval) { stop? }
        end
      end
    end # loop
  end
  @logger.info "Main thread exit."
end
shard_consumer(shard_id, queue) click to toggle source
# File lib/logstash/inputs/datahub.rb, line 272
def shard_consumer(shard_id, queue)
  @logger.info "Consumer thread started:" + shard_id
  while !check_stop do
    begin
      read_record(shard_id, queue)
    rescue DatahubPackage.exception.InvalidOperationException => e
      @logger.error "Shard:" + shard_id + " Invalid operation caused by sealed shard, consumer will exit."
      break
    rescue => e
      @logger.error "Read records failed" + e.backtrace.inspect.to_s
      @logger.error "Will retry after " + @retry_interval.to_s + " seconds."
      Stud.stoppable_sleep(@retry_interval) { stop? }
    end
  end # loop
  @logger.info "Consumer thread exit, shard:" + shard_id
end
stop() click to toggle source
# File lib/logstash/inputs/datahub.rb, line 337
def stop
  @@mutex.synchronize {
    @stop = true
  }
  @logger.info "Plugin stopping, waiting for consumer thread stop."
  @@consumer_threads.each { |thread|
    thread.join
  }
  @logger.info "Plugin stopped."
end
update_checkpoint() click to toggle source
# File lib/logstash/inputs/datahub.rb, line 163
def update_checkpoint()
  @logger.info "flush checkpoint:" + @cursor.to_s
  @@mutex.synchronize {
    checkpoint_file = File.open(@pos_file, "w")
    @cursor.each { |key,value|
      checkpoint_file.write(key+":"+value+"\n")
    }
    checkpoint_file.close
  }
end
update_cursor(shard_id, cursor) click to toggle source
# File lib/logstash/inputs/datahub.rb, line 195
def update_cursor(shard_id, cursor)
  @@mutex.synchronize {
    @cursor[shard_id] = cursor
  }
end