class Fluent::GcloudPubSub::MessageUnpacker
Public Class Methods
unpack(message)
click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 153 def self.unpack(message) attributes = message.attributes algorithm = attributes["compression_algorithm"] case algorithm when nil # For an uncompressed message return the single line and attributes [[message.message.data.chomp, message.attributes]] when COMPRESSION_ALGORITHM_ZLIB # Return all of the lines in the message, with empty attributes Zlib::Inflate .inflate(message.message.data) .split(BATCHED_RECORD_SEPARATOR) .map { |line| [line, {}] } else raise ArgumentError, "unknown compression algorithm: '#{algorithm}'" end end