class Fluent::Plugin::GcloudPubSubInput

Constants

DEFAULT_PARSER_TYPE

Public Instance Methods

configure(conf) click to toggle source

rubocop:disable Metrics/MethodLength

Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 113
def configure(conf)
  compat_parameters_convert(conf, :parser)
  super
  @rpc_srv = nil
  @rpc_thread = nil
  @stop_pull = false

  @extract_tag = if @tag_key.nil?
                   method(:static_tag)
                 else
                   method(:dynamic_tag)
                 end

  @parser = parser_create

  @messages_pulled =
    Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_pulled") do
      ::Prometheus::Client.registry.histogram(
        :"#{@metric_prefix}_messages_pulled",
        "Number of Pub/Sub messages pulled by the subscriber on each invocation",
        {},
        [0, 1, 10, 50, 100, 250, 500, 1000],
      )
    end

  @messages_pulled_bytes =
    Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_pulled_bytes") do
      ::Prometheus::Client.registry.histogram(
        :"#{@metric_prefix}_messages_pulled_bytes",
        "Total size in bytes of the Pub/Sub messages pulled by the subscriber on each invocation",
        {},
        [100, 1000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000],
      )
    end

  @pull_errors =
    Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_pull_errors_total") do
      ::Prometheus::Client.registry.counter(
        :"#{@metric_prefix}_pull_errors_total",
        "Errors encountered while pulling or processing messages",
        {},
      )
    end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 174
def shutdown
  if @rpc_srv
    @rpc_srv.shutdown
    @rpc_srv = nil
  end
  @rpc_thread = nil if @rpc_thread
  @stop_subscribing = true
  @subscribe_threads.each(&:join)
  super
end
start() click to toggle source

rubocop:enable Metrics/MethodLength

Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 159
def start
  super
  start_rpc if @enable_rpc

  @subscriber = Fluent::GcloudPubSub::Subscriber.new @project, @key, @topic, @subscription
  log.debug "connected subscription:#{@subscription} in project #{@project}"

  @emit_guard = Mutex.new
  @stop_subscribing = false
  @subscribe_threads = []
  @pull_threads.times do |idx|
    @subscribe_threads.push thread_create("in_gcloud_pubsub_subscribe_#{idx}".to_sym, &method(:subscribe))
  end
end
start_pull() click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 190
def start_pull
  @stop_pull = false
  log.info "start pull from subscription:#{@subscription}"
end
status_of_pull() click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 195
def status_of_pull
  @stop_pull ? "stopped" : "started"
end
stop_pull() click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 185
def stop_pull
  @stop_pull = true
  log.info "stop pull from subscription:#{@subscription}"
end

Private Instance Methods

_subscribe() click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 236
def _subscribe
  messages = @subscriber.pull @return_immediately, @max_messages
  @messages_pulled.observe(common_labels, messages.size)
  if messages.empty?
    log.debug "no messages are pulled"
    return
  end

  messages_size = messages.sum do |message|
    message.data.bytesize + message.attributes.sum { |k, v| k.bytesize + v.bytesize }
  end
  @messages_pulled_bytes.observe(common_labels, messages_size)

  process messages
  @subscriber.acknowledge messages

  log.debug "#{messages.length} message(s) processed"
rescue Fluent::GcloudPubSub::RetryableError => e
  @pull_errors.increment(common_labels.merge({ retryable: true }))
  log.warn "Retryable error occurs. Fluentd will retry.", error_message: e.to_s, error_class: e.class.to_s
rescue StandardError => e
  @pull_errors.increment(common_labels.merge({ retryable: false }))
  log.error "unexpected error", error_message: e.to_s, error_class: e.class.to_s
  log.error_backtrace e.backtrace
end
common_labels() click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 299
def common_labels
  { subscription: @subscription }
end
dynamic_tag(record) click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 205
def dynamic_tag(record)
  record.delete(@tag_key) || @tag
end
process(messages) click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 262
def process(messages)
  event_streams = Hash.new do |hsh, key|
    hsh[key] = Fluent::MultiEventStream.new
  end

  messages.each do |m|
    lines_attributes = Fluent::GcloudPubSub::MessageUnpacker.unpack(m)

    lines_attributes.each do |line, attributes|
      @parser.parse(line) do |time, record|
        if time && record
          @attribute_keys.each do |key|
            record[key] = attributes[key]
          end

          event_streams[@extract_tag.call(record)].add(time, record)
        else
          case @parse_error_action
          when :exception
            raise FailedParseError, "pattern not match: #{line}"
          else
            log.warn "pattern not match", record: line
          end
        end
      end
    end
  end

  event_streams.each do |tag, es|
    # There are some output plugins not to supposed to be called with multi-threading.
    # Maybe remove in the future.
    @emit_guard.synchronize do
      router.emit_stream(tag, es)
    end
  end
end
start_rpc() click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 209
def start_rpc
  log.info "listening http rpc server on http://#{@rpc_bind}:#{@rpc_port}/"
  @rpc_srv = WEBrick::HTTPServer.new(
    {
      BindAddress: @rpc_bind,
      Port: @rpc_port,
      Logger: WEBrick::Log.new(STDERR, WEBrick::Log::FATAL),
      AccessLog: [],
    },
  )
  @rpc_srv.mount("/api/in_gcloud_pubsub/pull/", RPCServlet, self)
  @rpc_thread = thread_create(:in_gcloud_pubsub_rpc_thread) do
    @rpc_srv.start
  end
end
static_tag(_record) click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 201
def static_tag(_record)
  @tag
end
subscribe() click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 225
def subscribe
  until @stop_subscribing
    _subscribe unless @stop_pull

    sleep @pull_interval if @return_immediately || @stop_pull
  end
rescue StandardError => e
  log.error "unexpected error", error_message: e.to_s, error_class: e.class.to_s
  log.error_backtrace e.backtrace
end