TODO: CompressedPackedMessage of forward protocol?
# File lib/fluent/plugin/buffer/chunk.rb, line 51 def initialize(metadata, compress: :text) super() @unique_id = generate_unique_id @metadata = metadata # state: unstaged/staged/queued/closed @state = :unstaged @size = 0 @created_at = Time.now @modified_at = Time.now extend Decompressable if compress == :gzip end
data is array of formatted record string
# File lib/fluent/plugin/buffer/chunk.rb, line 69 def append(data, **kwargs) raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip adding = ''.b data.each do |d| adding << d.b end concat(adding, data.size) end
# File lib/fluent/plugin/buffer/chunk.rb, line 91 def bytesize raise NotImplementedError, "Implement this method in child class" end
# File lib/fluent/plugin/buffer/chunk.rb, line 139 def close @state = :closed self end
# File lib/fluent/plugin/buffer/chunk.rb, line 120 def closed? @state == :closed end
# File lib/fluent/plugin/buffer/chunk.rb, line 83 def commit raise NotImplementedError, "Implement this method in child class" end
for event streams which is packed or zipped (and we want not to unpack/uncompress)
# File lib/fluent/plugin/buffer/chunk.rb, line 79 def concat(bulk, records) raise NotImplementedError, "Implement this method in child class" end
# File lib/fluent/plugin/buffer/chunk.rb, line 100 def empty? size == 0 end
# File lib/fluent/plugin/buffer/chunk.rb, line 134 def enqueued! @state = :queued self end
# File lib/fluent/plugin/buffer/chunk.rb, line 154 def open(**kwargs, &block) raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip raise NotImplementedError, "Implement this method in child class" end
# File lib/fluent/plugin/buffer/chunk.rb, line 144 def purge @state = :closed self end
# File lib/fluent/plugin/buffer/chunk.rb, line 116 def queued? @state == :queued end
# File lib/fluent/plugin/buffer/chunk.rb, line 149 def read(**kwargs) raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip raise NotImplementedError, "Implement this method in child class" end
# File lib/fluent/plugin/buffer/chunk.rb, line 87 def rollback raise NotImplementedError, "Implement this method in child class" end
# File lib/fluent/plugin/buffer/chunk.rb, line 95 def size raise NotImplementedError, "Implement this method in child class" end
# File lib/fluent/plugin/buffer/chunk.rb, line 124 def staged! @state = :staged self end
# File lib/fluent/plugin/buffer/chunk.rb, line 112 def staged? @state == :staged end
# File lib/fluent/plugin/buffer/chunk.rb, line 129 def unstaged! @state = :unstaged self end
# File lib/fluent/plugin/buffer/chunk.rb, line 108 def unstaged? @state == :unstaged end
# File lib/fluent/plugin/buffer/chunk.rb, line 104 def writable? @state == :staged || @state == :unstaged end
# File lib/fluent/plugin/buffer/chunk.rb, line 159 def write_to(io, **kwargs) raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip open do |i| IO.copy_stream(i, io) end end