class Fluent::GsvsocPubSubOutput
Constants
- Pubsub
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gsvsoc_pubsub.rb, line 64 def configure(conf) super raise Fluent::ConfigError, "buffer_chunk_records_limit may not exceed 999" if @buffer_chunk_records_limit > 999 raise Fluent::ConfigError, "'key' must be specified as /path/to/key.json (e.g. service_account .json file)" unless @key raise Fluent::ConfigError, "'topic' must be specified as projects/<project-name>/topics/<topic-name>" unless @topic end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_gsvsoc_pubsub.rb, line 82 def format(tag, time, record) record = inject_values_to_record(tag, time, record) record.to_msgpack end
publish(giw = 0, data, attributes)
click to toggle source
# File lib/fluent/plugin/out_gsvsoc_pubsub.rb, line 87 def publish(giw = 0, data, attributes) request = Pubsub::PublishRequest.new(messages: []) data.each do |d| request.messages << Pubsub::Message.new(data: d, attributes: attributes) end m = @client.publish_topic(@topic, request) log.info "messages count acks for group_#{giw}: ", m.message_ids.size rescue => e log.error "error publishing record: ", :error=>$!.to_s log.error_backtrace raise e end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gsvsoc_pubsub.rb, line 71 def start super ENV['GOOGLE_APPLICATION_CREDENTIALS']=@key pubsub = Pubsub::PubsubService.new pubsub.authorization = Google::Auth.get_application_default([Pubsub::AUTH_PUBSUB]) @client = pubsub @client.request_options.retries = 2 @client.request_options.timeout_sec = 30 @client.request_options.open_timeout_sec = 30 end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_gsvsoc_pubsub.rb, line 100 def write(chunk) messages = [] chunk.msgpack_each { |m| messages << m } if messages.length > 0 # attributes arrive as key:val,key:val,key:val attributes = Hash[@attrs.split(",").map {|str| str.split(":")}] log.info "messages attributes: ", attributes log.info "messages count: ", messages.count # the messages array is split into multiples of @buffer_chunk_records_limit gofmsgs = messages.each_slice(@buffer_chunk_records_limit).to_a gid = chunk.hash.abs log.info "messages size of group_#{gid}: ", gofmsgs.size # group of messages is published in parallel Parallel.each_with_index(gofmsgs, in_threads: @parallel_in_threads) do |data, i| log.info "messages count sent for group_#{gid}-#{i}-#{Parallel.worker_number}: ", data.size publish("#{gid}-#{i}-#{Parallel.worker_number}", data, attributes) end end rescue => e log.error "unexpected error", :error=>$!.to_s log.error_backtrace raise e end