class Fluent::CloudPubSubOutput

Constants

MAX_MSGS_PER_REQ
MAX_REQ_SIZE

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cloud_pubsub.rb, line 24
def configure(conf)
  super

  raise Fluent::ConfigError, "'project' must be specified." unless @project
  raise Fluent::ConfigError, "'topic' must be specified." unless @topic
  raise Fluent::ConfigError, "'key' must be specified." unless @key
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_cloud_pubsub.rb, line 39
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
publish(msgs) click to toggle source
# File lib/fluent/plugin/out_cloud_pubsub.rb, line 43
def publish(msgs)
  log.debug "publish #{msgs.length} messages"

  @client.publish do |batch|
    msgs.each do |m|
      batch.publish m
    end
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cloud_pubsub.rb, line 32
def start
  super

  pubsub = (Gcloud.new @project, @key).pubsub
  @client = pubsub.topic @topic
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_cloud_pubsub.rb, line 53
def write(chunk)
  msgs = []
  msgs_size = 0

  chunk.msgpack_each do |tag, time, record|
    size = Yajl.dump(record).bytesize
    if msgs.length > 0 && (msgs_size + size > @max_req_size || msgs.length + 1 > @max_msgs_per_req)
      publish(msgs)
      msgs = []
      msgs_size = 0
    end
    msgs << record.to_json
    msgs_size += size
  end

  if msgs.length > 0
    publish(msgs)
  end
rescue
  log.error "unexpected error", :error=>$!.to_s
  log.error_backtrace
end