class Fluent::GcloudPubSub::Publisher
Public Class Methods
new(project, key, autocreate_topic, metric_prefix)
click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 35 def initialize(project, key, autocreate_topic, metric_prefix) @pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key @autocreate_topic = autocreate_topic @topics = {} # rubocop:disable Layout/LineLength @compression_ratio = Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{metric_prefix}_messages_compressed_size_per_original_size_ratio") do ::Prometheus::Client.registry.histogram( :"#{metric_prefix}_messages_compressed_size_per_original_size_ratio", "Compression ratio achieved on a batch of messages", {}, # We expect compression for even a single message to be typically # above 2x (0.5/50%), so bias the buckets towards the higher end # of the range. [0, 0.25, 0.5, 0.75, 0.85, 0.9, 0.95, 0.975, 1], ) end @compression_duration = Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{metric_prefix}_messages_compression_duration_seconds") do ::Prometheus::Client.registry.histogram( :"#{metric_prefix}_messages_compression_duration_seconds", "Time taken to compress a batch of messages", {}, [0, 0.0001, 0.0005, 0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1], ) end # rubocop:enable Layout/LineLength end
Public Instance Methods
publish(topic_name, messages, compress_batches = false)
click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 77 def publish(topic_name, messages, compress_batches = false) if compress_batches topic(topic_name).publish(*compress_messages_with_zlib(messages, topic_name)) else topic(topic_name).publish do |batch| messages.each do |m| batch.publish m.message, m.attributes end end end rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => e raise RetryableError, "Google api returns error:#{e.class} message:#{e}" end
topic(topic_name)
click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 66 def topic(topic_name) return @topics[topic_name] if @topics.key? topic_name client = @pubsub.topic topic_name client = @pubsub.create_topic topic_name if client.nil? && @autocreate_topic raise Error, "topic:#{topic_name} does not exist." if client.nil? @topics[topic_name] = client client end
Private Instance Methods
compress_messages_with_zlib(messages, topic_name)
click to toggle source
# File lib/fluent/plugin/gcloud_pubsub/client.rb, line 93 def compress_messages_with_zlib(messages, topic_name) original_size = messages.sum(&:bytesize) # This should never happen, only a programming error or major # misconfiguration should lead to this situation. But checking against # it here avoids a potential division by zero later on. raise ArgumentError, "not compressing empty inputs" if original_size.zero? # Here we're implicitly dropping the 'attributes' field of the messages # that we're iterating over. # This is fine, because the :attribute_keys config param is not # supported when in compressed mode, so this field will always be # empty. packed_messages = messages.map(&:message).join(BATCHED_RECORD_SEPARATOR) duration, compressed_messages = Fluent::GcloudPubSub::Metrics.measure_duration do Zlib::Deflate.deflate(packed_messages) end @compression_duration.observe( { topic: topic_name, algorithm: COMPRESSION_ALGORITHM_ZLIB }, duration, ) compressed_size = compressed_messages.bytesize @compression_ratio.observe( { topic: topic_name, algorithm: COMPRESSION_ALGORITHM_ZLIB }, # If original = 1MiB and compressed = 256KiB; then metric value = 0.75 = 75% when plotted 1 - compressed_size.to_f / original_size, ) [compressed_messages, { "compression_algorithm": COMPRESSION_ALGORITHM_ZLIB }] end