class Fluent::Plugin::GcloudPubSubOutput

Constants

DEFAULT_BUFFER_TYPE
DEFAULT_FORMATTER_TYPE

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 41
def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter)
  super
  @formatter = formatter_create
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 53
def format(tag, time, record)
  @formatter.format(tag, time, record).to_msgpack
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 57
def formatted_to_msgpack_binary?
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 61
def multi_workers_ready?
  true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 47
def start
  super
  @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @topic, @skip_lookup
  log.debug "connected topic:#{@topic} in project #{@project}"
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 65
def write(chunk)
  messages = []
  size = 0

  chunk.msgpack_each do |msg|
    if msg.bytesize > @max_message_size
      log.warn 'Drop a message because its size exceeds `max_message_size`', size: msg.bytesize
      next
    end
    if messages.length + 1 > @max_messages || size + msg.bytesize > @max_total_size
      publish messages
      messages = []
      size = 0
    end
    messages << msg
    size += msg.bytesize
  end

  if messages.length > 0
    publish messages
  end
rescue Fluent::GcloudPubSub::RetryableError => ex
  log.warn "Retryable error occurs. Fluentd will retry.", error_message: ex.to_s, error_class: ex.class.to_s
  raise ex
rescue Google::Cloud::UnauthenticatedError => ex
  log.warn "Encountered UnauthenticatedError, renewing @publisher instance", error_message: ex.to_s, error_class: ex.class.to_s
  @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @topic, @skip_lookup
  raise ex
rescue => ex
  log.error "unexpected error", error_message: ex.to_s, error_class: ex.class.to_s
  log.error_backtrace
  raise ex
end

Private Instance Methods

publish(messages) click to toggle source
# File lib/fluent/plugin/out_gcloud_pubsub.rb, line 101
def publish(messages)
  log.debug "send message topic:#{@topic} length:#{messages.length} size:#{messages.map(&:bytesize).inject(:+)}"
  @publisher.publish messages
end