class Fluent::BufferizeOutput::PosKeeper
Constants
- FILE_PERMISSION
Public Class Methods
get(chunk)
click to toggle source
# File lib/fluent/plugin/out_bufferize.rb, line 10 def self.get(chunk) @@instances[chunk.unique_id] ||= PosKeeper.new(chunk) @@instances[chunk.unique_id] end
new(chunk)
click to toggle source
# File lib/fluent/plugin/out_bufferize.rb, line 19 def initialize(chunk) @id = chunk.unique_id @count = 0 @chunk = chunk if chunk.respond_to? :path @path = chunk.path + ".pos" mode = File::CREAT | File::RDWR perm = FILE_PERMISSION @io = File.open(@path, mode, perm) @io.sync = true line = @io.gets @count = line ? line.to_i : 0 @type = :file else @type = :mem end end
remove(chunk)
click to toggle source
# File lib/fluent/plugin/out_bufferize.rb, line 15 def self.remove(chunk) @@instances.delete(chunk.unique_id) end
Public Instance Methods
each() { |tag, time, record| ... }
click to toggle source
# File lib/fluent/plugin/out_bufferize.rb, line 38 def each(&block) @chunk.open do |io| u = MessagePack::Unpacker.new(io) begin if @count > 0 $log.debug "Bufferize: skip first #{@count} messages" @count.times do u.skip end end loop do tag, time, record = u.read yield(tag, time, record) increment end rescue EOFError end end remove end
increment()
click to toggle source
# File lib/fluent/plugin/out_bufferize.rb, line 61 def increment @count += 1 if @type == :file @io.seek(0, IO::SEEK_SET) @io.puts(@count) end end
remove()
click to toggle source
# File lib/fluent/plugin/out_bufferize.rb, line 69 def remove if @type == :file @io.close unless @io.closed? File.unlink(@path) end end