class Fluent::PullPoolBuffer
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/buf_pullpool.rb, line 28 def initialize super @mutex = Mutex.new @pool = [] end
Public Instance Methods
new_chunk(key)
click to toggle source
# File lib/fluent/plugin/buf_pullpool.rb, line 61 def new_chunk(key) # copy&paste from FileBuffer, but s/FileBufferChunk/PullBufferChunk/ encoded_key = encode_key(key) path, tsuffix = make_path(encoded_key, "b") unique_id = tsuffix_to_unique_id(tsuffix) PullBufferChunk.new(key, path, unique_id, "a+", @symlink_path) end
pull_chunks() { |chunk| ... }
click to toggle source
# File lib/fluent/plugin/buf_pullpool.rb, line 43 def pull_chunks raise "BUG: block not given" unless block_given? chunks = nil @mutex.synchronize do chunks = @pool @pool = [] end chunks.each do |chunk| begin yield chunk chunk.actual_purge rescue => e # TODO: Chunks should be returned to @pool ? # It may make infinite error loops. Hmmm.... end end end
resume()
click to toggle source
# File lib/fluent/plugin/buf_pullpool.rb, line 68 def resume # copy&paste from FileBuffer, but s/FileBufferChunk/PullBufferChunk/ maps = [] queues = [] Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path| match = path[@buffer_path_prefix.length..-(@buffer_path_suffix.length+1)] if m = PATH_MATCH.match(match) key = decode_key(m[1]) bq = m[2] tsuffix = m[3] timestamp = m[3].to_i(16) unique_id = tsuffix_to_unique_id(tsuffix) if bq == 'b' chunk = PullBufferChunk.new(key, path, unique_id, "a+") maps << [timestamp, chunk] elsif bq == 'q' chunk = PullBufferChunk.new(key, path, unique_id, "r") queues << [timestamp, chunk] end end } map = {} maps.sort_by {|(timestamp,chunk)| timestamp }.each {|(timestamp,chunk)| map[chunk.key] = chunk } queue = queues.sort_by {|(timestamp,chunk)| timestamp }.map {|(timestamp,chunk)| chunk } return queue, map end
write_chunk(chunk, out)
click to toggle source
# File lib/fluent/plugin/buf_pullpool.rb, line 34 def write_chunk(chunk, out) if out.respond_to?(:write) # for normal BufferedOutput plugins: works just same as FileBuffer out.write(chunk) chunk.flushed = true else @pool.push(chunk) end end