class LogStash::Inputs::Datahub
Datahub
output plugin
Constants
- DatahubPackage
Public Instance Methods
check_params()
click to toggle source
# File lib/logstash/inputs/datahub.rb, line 116 def check_params() @logger.info "Config shardIds:" + @shard_ids.to_s if @shard_ids.size != 0 valid = true for shard in 0...@shard_ids.size @logger.info "Checking shard:" + @shard_ids[shard] shard_exist_active = false for i in 0...@shards.size shard_entry = @shards[i] if shard_entry.getShardId() == @shard_ids[shard] && shard_entry.getState() == DatahubPackage.model.ShardState::ACTIVE shard_exist_active = true break end end if !shard_exist_active valid = false end end if (!valid) @logger.error "Config shard_id not exists or state not active, check your config" raise "Config shard_id not exists or state not active, check your config" end else valid = false for i in 0...@shards.size shard_entry = @shards[i] @logger.info "Checking shard:" + shard_entry.getShardId() if shard_entry.getState() == DatahubPackage.model.ShardState::ACTIVE @shard_ids.push(shard_entry.getShardId()) valid = true end end if (!valid) @logger.error "There is no active shard." raise "There is no active shard." end end @logger.info "Reading from shards:" + @shard_ids.to_s begin checkpoint_file = File.open(@pos_file, "a+") checkpoint_file.close rescue @logger.error "Config pos file is invalid, pos_file must point to a file." raise "Config pos file is invalid, pos_file must point to a file." end end
check_stop()
click to toggle source
# File lib/logstash/inputs/datahub.rb, line 289 def check_stop() @@mutex.synchronize { return @stop } end
get_active_shards(shards)
click to toggle source
# File lib/logstash/inputs/datahub.rb, line 326 def get_active_shards(shards) active_shards = [] for i in 0...shards.size entry = shards.get(i) if entry.getState() == DatahubPackage.model.ShardState::ACTIVE active_shards.push(entry) end end return active_shards end
get_cursor(shard_id, force = false)
click to toggle source
# File lib/logstash/inputs/datahub.rb, line 201 def get_cursor(shard_id, force = false) @@mutex.synchronize { if (force || !@cursor.has_key?(shard_id) || @cursor[shard_id]==nil || @cursor[shard_id]=="") @logger.info "Shard:" + shard_id + " has no checkpoint, will seek to begin." if (@cursor_time.has_key?(shard_id)) if (@cursor_time[shard_id] == '-1') cursorRs = @client.getCursor(@project_name, @topic_name, shard_id, DatahubPackage.model.GetCursorRequest::CursorType::LATEST) else cursorRs = @client.getCursor(@project_name, @topic_name, shard_id, @cursor_time[shard_id].to_i * 1000) end else cursorRs = @client.getCursor(@project_name, @topic_name, shard_id, DatahubPackage.model.GetCursorRequest::CursorType::OLDEST) end @cursor[shard_id] = cursorRs.getCursor() @logger.info "Start reading shard:" + shard_id + " with cursor:" + @cursor[shard_id] end return @cursor[shard_id] } end
read_checkpoint()
click to toggle source
# File lib/logstash/inputs/datahub.rb, line 174 def read_checkpoint() begin @logger.info "read checkpoint:" + @pos_file @@mutex.synchronize { File.foreach(@pos_file) do |line| checkpoint = line.chomp cursor_param = checkpoint.split(":") if cursor_param.size != 2 raise "Invalid checkpoint:" + checkpoint end @cursor[cursor_param[0]] = cursor_param[1] end @logger.info "recover checkpoint:" + @cursor.to_s } rescue => e @logger.error e.backtrace.inspect.to_s @logger.error "No checkpoint found or invalid, will start normally." raise e end end
read_record(shard_id, queue)
click to toggle source
# File lib/logstash/inputs/datahub.rb, line 221 def read_record(shard_id, queue) cursor = get_cursor(shard_id) begin @logger.info "Get record at shard: " + shard_id + " cursor:" + cursor recordRs = @client.getRecords(@project_name, @topic_name, shard_id, cursor, @batch_limit, @schema) recordEntries = recordRs.getRecords() recordEntries.each do |record| data = Hash.new @fields.each do |field| case field.getType() when DatahubPackage.common.data.FieldType::BIGINT data[field.getName()] = record.getBigint(field.getName()) when DatahubPackage.common.data.FieldType::DOUBLE data[field.getName()] = record.getDouble(field.getName()) when DatahubPackage.common.data.FieldType::BOOLEAN data[field.getName()] = record.getBoolean(field.getName()) when DatahubPackage.common.data.FieldType::TIMESTAMP data[field.getName()] = record.getTimeStamp(field.getName()) when DatahubPackage.common.data.FieldType::STRING data[field.getName()] = record.getString(field.getName()) else @logger.error "Unknow type " + field.getType().toString() raise "Unknow type " + field.getType().toString() end end data["timestamp"] = record.getSystemTime() event = LogStash::Event.new("message"=>data) decorate(event) queue << event end cursor = recordRs.getNextCursor() update_cursor(shard_id, cursor) @logger.info "Shard:" + shard_id + " Next cursor:" + cursor + " GetRecords:" + (recordRs.getRecordCount()).to_s if (recordRs.getRecordCount() == 0) @logger.info "Read to end, waiting for data:" + cursor Stud.stoppable_sleep(@interval) { stop? } else update_checkpoint end rescue DatahubPackage.exception.InvalidCursorException => e @logger.error "Shard:" + shard_id + " Invalid cursor:" + cursor + ", seek to begin." get_cursor(shard_id, true) rescue DatahubPackage.exception.InvalidOperationException => e @logger.error "Shard:" + shard_id + " Invalid operation, cursor:" + cursor + ", shard is sealed, consumer will exit." raise e rescue DatahubPackage.exception.DatahubClientException => e @logger.error "Read failed:" + e.getMessage() + ", will retry later." Stud.stoppable_sleep(@retry_interval) { stop? } end end
register()
click to toggle source
# File lib/logstash/inputs/datahub.rb, line 82 def register begin @stop = false @account = DatahubPackage.auth.AliyunAccount::new(@access_id, @access_key) @conf = DatahubPackage.DatahubConfiguration::new(@account, @endpoint) @client = DatahubPackage.DatahubClient::new(@conf) @project = DatahubPackage.wrapper.Project::Builder.build(@project_name, @client) @topic = @project.getTopic(@topic_name) @shards = get_active_shards(@topic.listShard()) @shard_count = @shards.size() result = @client.getTopic(@project_name, @topic_name) @schema = result.getRecordSchema() @fields = @schema.getFields() # 前置校验参数 check_params() # 读取checkpoint read_checkpoint() if @shard_count == 0 @logger.error "No active shard available, please check" raise "No active shard available, please check" end @logger.info "Init datahub success!" rescue => e @logger.error "Init failed!" + e.message + " " + e.backtrace.inspect.to_s raise e end end
run(queue)
click to toggle source
# File lib/logstash/inputs/datahub.rb, line 295 def run(queue) if @multi_thread for i in 0...@shard_ids.size @@consumer_threads << Thread.new(@shard_ids[i],queue) {|shard_id,queue| shard_consumer(shard_id, queue) } end while !check_stop do Stud.stoppable_sleep(15) { stop? } @logger.debug "Main thread heartbeat." end # loop else while !check_stop do for i in 0...@shard_ids.size begin read_record(@shard_ids[i], queue) rescue DatahubPackage.exception.InvalidOperationException => e @logger.error "Shard:" + @shard_ids[i] + " Invalid operation caused by sealed shard, remove this shard." @shard_ids.delete_at(i) break rescue => e @logger.error "Read records failed" + e.backtrace.inspect.to_s @logger.error "Will retry after " + @retry_interval.to_s + " seconds." Stud.stoppable_sleep(@retry_interval) { stop? } end end end # loop end @logger.info "Main thread exit." end
shard_consumer(shard_id, queue)
click to toggle source
# File lib/logstash/inputs/datahub.rb, line 272 def shard_consumer(shard_id, queue) @logger.info "Consumer thread started:" + shard_id while !check_stop do begin read_record(shard_id, queue) rescue DatahubPackage.exception.InvalidOperationException => e @logger.error "Shard:" + shard_id + " Invalid operation caused by sealed shard, consumer will exit." break rescue => e @logger.error "Read records failed" + e.backtrace.inspect.to_s @logger.error "Will retry after " + @retry_interval.to_s + " seconds." Stud.stoppable_sleep(@retry_interval) { stop? } end end # loop @logger.info "Consumer thread exit, shard:" + shard_id end
stop()
click to toggle source
# File lib/logstash/inputs/datahub.rb, line 337 def stop @@mutex.synchronize { @stop = true } @logger.info "Plugin stopping, waiting for consumer thread stop." @@consumer_threads.each { |thread| thread.join } @logger.info "Plugin stopped." end
update_checkpoint()
click to toggle source
# File lib/logstash/inputs/datahub.rb, line 163 def update_checkpoint() @logger.info "flush checkpoint:" + @cursor.to_s @@mutex.synchronize { checkpoint_file = File.open(@pos_file, "w") @cursor.each { |key,value| checkpoint_file.write(key+":"+value+"\n") } checkpoint_file.close } end
update_cursor(shard_id, cursor)
click to toggle source
# File lib/logstash/inputs/datahub.rb, line 195 def update_cursor(shard_id, cursor) @@mutex.synchronize { @cursor[shard_id] = cursor } end