class KinesisShard::StateStore
Public Class Methods
new(dir_path, shard_id)
click to toggle source
# File lib/fluent/plugin/kinesis_shard.rb, line 91 def initialize(dir_path, shard_id) unless Dir.exist?(dir_path) begin FileUtils.mkdir_p(dir_path) rescue => e raise "does not make a directory : #{e.message}" end end @path = "#{dir_path}/last_recode_#{shard_id}.json" if File.exists?(@path) begin load_json_file rescue => e $log.warn "load_json_file: #{e.message}" end end if @data.nil? @data = {'last_sequence_number' => ''} end unless @data.is_a?(Hash) raise "state_file on #{@path.inspect} is invalid" end end
Public Instance Methods
load_json_file()
click to toggle source
# File lib/fluent/plugin/kinesis_shard.rb, line 119 def load_json_file() open(@path) do |io| @data =Yajl.load(io) end end
load_sequence_number()
click to toggle source
# File lib/fluent/plugin/kinesis_shard.rb, line 125 def load_sequence_number @data['last_sequence_number'] end
update(sequence_number)
click to toggle source
# File lib/fluent/plugin/kinesis_shard.rb, line 129 def update(sequence_number) @data['last_sequence_number'] = sequence_number open(@path, "w") do |io| Yajl.dump(@data, io) end end