class Fluent::Plugin::GcloudPubSubOutput
Constants
- DEFAULT_BUFFER_TYPE
- DEFAULT_FORMATTER_TYPE
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 41 def configure(conf) compat_parameters_convert(conf, :buffer, :formatter) super @formatter = formatter_create end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 53 def format(tag, time, record) @formatter.format(tag, time, record).to_msgpack end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 57 def formatted_to_msgpack_binary? true end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 61 def multi_workers_ready? true end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 47 def start super @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @topic, @skip_lookup log.debug "connected topic:#{@topic} in project #{@project}" end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 65 def write(chunk) messages = [] size = 0 chunk.msgpack_each do |msg| if msg.bytesize > @max_message_size log.warn 'Drop a message because its size exceeds `max_message_size`', size: msg.bytesize next end if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size publish messages messages = [] size = 0 end messages << msg size += msg.bytesize end if messages.length > 0 publish messages end rescue Fluent::GcloudPubSub::RetryableError => ex log.warn "Retryable error occurs. Fluentd will retry.", error_message: ex.to_s, error_class: ex.class.to_s raise ex rescue Google::Cloud::UnauthenticatedError => ex log.warn "Encountered UnauthenticatedError, renewing @publisher instance", error_message: ex.to_s, error_class: ex.class.to_s @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @topic, @skip_lookup raise ex rescue => ex log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s log.error_backtrace raise ex end
Private Instance Methods
publish(messages)
click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 101 def publish(messages) log.debug "send message topic:#{@topic} length:#{messages.length} size:#{messages.map(&:bytesize).inject(:+)}" @publisher.publish messages end