class Fluent::PullPoolBuffer

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/buf_pullpool.rb, line 28
def initialize
  super
  @mutex = Mutex.new
  @pool = []
end

Public Instance Methods

new_chunk(key) click to toggle source
# File lib/fluent/plugin/buf_pullpool.rb, line 61
def new_chunk(key) # copy&paste from FileBuffer, but s/FileBufferChunk/PullBufferChunk/
  encoded_key = encode_key(key)
  path, tsuffix = make_path(encoded_key, "b")
  unique_id = tsuffix_to_unique_id(tsuffix)
  PullBufferChunk.new(key, path, unique_id, "a+", @symlink_path)
end
pull_chunks() { |chunk| ... } click to toggle source
# File lib/fluent/plugin/buf_pullpool.rb, line 43
def pull_chunks
  raise "BUG: block not given" unless block_given?
  chunks = nil
  @mutex.synchronize do
    chunks = @pool
    @pool = []
  end
  chunks.each do |chunk|
    begin
      yield chunk
      chunk.actual_purge
    rescue => e
      # TODO: Chunks should be returned to @pool ?
      #       It may make infinite error loops. Hmmm....
    end
  end
end
resume() click to toggle source
# File lib/fluent/plugin/buf_pullpool.rb, line 68
def resume # copy&paste from FileBuffer, but s/FileBufferChunk/PullBufferChunk/
  maps = []
  queues = []

  Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path|
    match = path[@buffer_path_prefix.length..-(@buffer_path_suffix.length+1)]
    if m = PATH_MATCH.match(match)
      key = decode_key(m[1])
      bq = m[2]
      tsuffix = m[3]
      timestamp = m[3].to_i(16)
      unique_id = tsuffix_to_unique_id(tsuffix)

      if bq == 'b'
        chunk = PullBufferChunk.new(key, path, unique_id, "a+")
        maps << [timestamp, chunk]
      elsif bq == 'q'
        chunk = PullBufferChunk.new(key, path, unique_id, "r")
        queues << [timestamp, chunk]
      end
    end
  }

  map = {}
  maps.sort_by {|(timestamp,chunk)|
    timestamp
  }.each {|(timestamp,chunk)|
    map[chunk.key] = chunk
  }

  queue = queues.sort_by {|(timestamp,chunk)|
    timestamp
  }.map {|(timestamp,chunk)|
    chunk
  }

  return queue, map
end
write_chunk(chunk, out) click to toggle source
# File lib/fluent/plugin/buf_pullpool.rb, line 34
def write_chunk(chunk, out)
  if out.respond_to?(:write) # for normal BufferedOutput plugins: works just same as FileBuffer
    out.write(chunk)
    chunk.flushed = true
  else
    @pool.push(chunk)
  end
end