class Fluent::CloudPubSubOutput
Constants
- MAX_MSGS_PER_REQ
- MAX_REQ_SIZE
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cloud_pubsub.rb, line 24 def configure(conf) super raise Fluent::ConfigError, "'project' must be specified." unless @project raise Fluent::ConfigError, "'topic' must be specified." unless @topic raise Fluent::ConfigError, "'key' must be specified." unless @key end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_cloud_pubsub.rb, line 39 def format(tag, time, record) [tag, time, record].to_msgpack end
publish(msgs)
click to toggle source
# File lib/fluent/plugin/out_cloud_pubsub.rb, line 43 def publish(msgs) log.debug "publish #{msgs.length} messages" @client.publish do |batch| msgs.each do |m| batch.publish m end end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_cloud_pubsub.rb, line 32 def start super pubsub = (Gcloud.new @project, @key).pubsub @client = pubsub.topic @topic end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_cloud_pubsub.rb, line 53 def write(chunk) msgs = [] msgs_size = 0 chunk.msgpack_each do |tag, time, record| size = Yajl.dump(record).bytesize if msgs.length > 0 && (msgs_size + size > @max_req_size || msgs.length + 1 > @max_msgs_per_req) publish(msgs) msgs = [] msgs_size = 0 end msgs << record.to_json msgs_size += size end if msgs.length > 0 publish(msgs) end rescue log.error "unexpected error", :error=>$!.to_s log.error_backtrace end