class KinesisShard::StateStore

Public Class Methods

new(dir_path, shard_id) click to toggle source
# File lib/fluent/plugin/kinesis_shard.rb, line 91
def initialize(dir_path, shard_id)
  
  unless Dir.exist?(dir_path)
    begin
      FileUtils.mkdir_p(dir_path)
    rescue => e
      raise "does not make a directory : #{e.message}"
    end
  end
  @path = "#{dir_path}/last_recode_#{shard_id}.json"
  
  if File.exists?(@path)
    begin
      load_json_file
    rescue => e
      $log.warn "load_json_file: #{e.message}"
    end
  end
  
  if @data.nil?
    @data = {'last_sequence_number' => ''}
  end
  
  unless @data.is_a?(Hash)
    raise "state_file on #{@path.inspect} is invalid"
  end
end

Public Instance Methods

load_json_file() click to toggle source
# File lib/fluent/plugin/kinesis_shard.rb, line 119
def load_json_file()
  open(@path) do |io|
    @data =Yajl.load(io)
  end
end
load_sequence_number() click to toggle source
# File lib/fluent/plugin/kinesis_shard.rb, line 125
def load_sequence_number
  @data['last_sequence_number']
end
update(sequence_number) click to toggle source
# File lib/fluent/plugin/kinesis_shard.rb, line 129
def update(sequence_number)
  @data['last_sequence_number'] = sequence_number
  open(@path, "w") do |io|
    Yajl.dump(@data, io)
  end
end