class Fluent::GsvsocPubSubOutput

Constants

Pubsub

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gsvsoc_pubsub.rb, line 64
def configure(conf)
  super
  raise Fluent::ConfigError, "buffer_chunk_records_limit may not exceed 999" if @buffer_chunk_records_limit > 999
  raise Fluent::ConfigError, "'key' must be specified as /path/to/key.json (e.g. service_account .json file)" unless @key
  raise Fluent::ConfigError, "'topic' must be specified as projects/<project-name>/topics/<topic-name>" unless @topic
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_gsvsoc_pubsub.rb, line 82
def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  record.to_msgpack
end
publish(giw = 0, data, attributes) click to toggle source
# File lib/fluent/plugin/out_gsvsoc_pubsub.rb, line 87
def publish(giw = 0, data, attributes)
  request = Pubsub::PublishRequest.new(messages: [])
  data.each do |d|
    request.messages << Pubsub::Message.new(data: d, attributes: attributes)
  end
  m = @client.publish_topic(@topic, request)
  log.info "messages count acks for group_#{giw}: ", m.message_ids.size
rescue => e
  log.error "error publishing record: ", :error=>$!.to_s
  log.error_backtrace
  raise e
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_gsvsoc_pubsub.rb, line 71
def start
  super
  ENV['GOOGLE_APPLICATION_CREDENTIALS']=@key
  pubsub = Pubsub::PubsubService.new
  pubsub.authorization = Google::Auth.get_application_default([Pubsub::AUTH_PUBSUB])
  @client = pubsub
  @client.request_options.retries = 2
  @client.request_options.timeout_sec = 30
  @client.request_options.open_timeout_sec = 30
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_gsvsoc_pubsub.rb, line 100
def write(chunk)
  messages = []
  chunk.msgpack_each { |m| messages << m }
  
  if messages.length > 0
    # attributes arrive as key:val,key:val,key:val
    attributes = Hash[@attrs.split(",").map {|str| str.split(":")}]
    log.info "messages attributes: ", attributes
    log.info "messages count: ", messages.count
    
    # the messages array is split into multiples of @buffer_chunk_records_limit
    gofmsgs = messages.each_slice(@buffer_chunk_records_limit).to_a

    gid = chunk.hash.abs
    log.info "messages size of group_#{gid}: ", gofmsgs.size
    
    # group of messages is published in parallel
    Parallel.each_with_index(gofmsgs, in_threads: @parallel_in_threads) do |data, i|
      log.info "messages count sent for group_#{gid}-#{i}-#{Parallel.worker_number}: ", data.size
      publish("#{gid}-#{i}-#{Parallel.worker_number}", data, attributes)
    end
  end
rescue => e
  log.error "unexpected error", :error=>$!.to_s
  log.error_backtrace
  raise e
end