class Fluent::BufferizeOutput::PosKeeper

Constants

FILE_PERMISSION

Public Class Methods

get(chunk) click to toggle source
# File lib/fluent/plugin/out_bufferize.rb, line 10
def self.get(chunk)
  @@instances[chunk.unique_id] ||= PosKeeper.new(chunk)
  @@instances[chunk.unique_id]
end
new(chunk) click to toggle source
# File lib/fluent/plugin/out_bufferize.rb, line 19
def initialize(chunk)
  @id = chunk.unique_id
  @count = 0
  @chunk = chunk

  if chunk.respond_to? :path
    @path = chunk.path +  ".pos"
    mode = File::CREAT | File::RDWR
    perm = FILE_PERMISSION
    @io = File.open(@path, mode, perm)
    @io.sync = true
    line = @io.gets
    @count = line ? line.to_i : 0
    @type = :file
  else
    @type = :mem
  end
end
remove(chunk) click to toggle source
# File lib/fluent/plugin/out_bufferize.rb, line 15
def self.remove(chunk)
  @@instances.delete(chunk.unique_id)
end

Public Instance Methods

each() { |tag, time, record| ... } click to toggle source
# File lib/fluent/plugin/out_bufferize.rb, line 38
def each(&block)
  @chunk.open do |io|
    u = MessagePack::Unpacker.new(io)
    begin
      if @count > 0
        $log.debug "Bufferize: skip first #{@count} messages" 
        @count.times do
          u.skip
        end
      end

      loop do
        tag, time, record = u.read
        yield(tag, time, record)
        increment
      end

    rescue EOFError
    end
  end
  remove
end
increment() click to toggle source
# File lib/fluent/plugin/out_bufferize.rb, line 61
def increment
  @count += 1
  if @type == :file
    @io.seek(0, IO::SEEK_SET)
    @io.puts(@count)
  end
end
remove() click to toggle source
# File lib/fluent/plugin/out_bufferize.rb, line 69
def remove
  if @type == :file
    @io.close unless @io.closed?
    File.unlink(@path)
  end
end