class Fluent::Plugin::GcloudPubSubInput
Constants
- DEFAULT_PARSER_TYPE
Public Instance Methods
configure(conf)
click to toggle source
rubocop:disable Metrics/MethodLength
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 113 def configure(conf) compat_parameters_convert(conf, :parser) super @rpc_srv = nil @rpc_thread = nil @stop_pull = false @extract_tag = if @tag_key.nil? method(:static_tag) else method(:dynamic_tag) end @parser = parser_create @messages_pulled = Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_pulled") do ::Prometheus::Client.registry.histogram( :"#{@metric_prefix}_messages_pulled", "Number of Pub/Sub messages pulled by the subscriber on each invocation", {}, [0, 1, 10, 50, 100, 250, 500, 1000], ) end @messages_pulled_bytes = Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_pulled_bytes") do ::Prometheus::Client.registry.histogram( :"#{@metric_prefix}_messages_pulled_bytes", "Total size in bytes of the Pub/Sub messages pulled by the subscriber on each invocation", {}, [100, 1000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000], ) end @pull_errors = Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_pull_errors_total") do ::Prometheus::Client.registry.counter( :"#{@metric_prefix}_pull_errors_total", "Errors encountered while pulling or processing messages", {}, ) end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 174 def shutdown if @rpc_srv @rpc_srv.shutdown @rpc_srv = nil end @rpc_thread = nil if @rpc_thread @stop_subscribing = true @subscribe_threads.each(&:join) super end
start()
click to toggle source
rubocop:enable Metrics/MethodLength
Calls superclass method
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 159 def start super start_rpc if @enable_rpc @subscriber = Fluent::GcloudPubSub::Subscriber.new @project, @key, @topic, @subscription log.debug "connected subscription:#{@subscription} in project #{@project}" @emit_guard = Mutex.new @stop_subscribing = false @subscribe_threads = [] @pull_threads.times do |idx| @subscribe_threads.push thread_create("in_gcloud_pubsub_subscribe_#{idx}".to_sym, &method(:subscribe)) end end
start_pull()
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 190 def start_pull @stop_pull = false log.info "start pull from subscription:#{@subscription}" end
status_of_pull()
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 195 def status_of_pull @stop_pull ? "stopped" : "started" end
stop_pull()
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 185 def stop_pull @stop_pull = true log.info "stop pull from subscription:#{@subscription}" end
Private Instance Methods
_subscribe()
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 236 def _subscribe messages = @subscriber.pull @return_immediately, @max_messages @messages_pulled.observe(common_labels, messages.size) if messages.empty? log.debug "no messages are pulled" return end messages_size = messages.sum do |message| message.data.bytesize + message.attributes.sum { |k, v| k.bytesize + v.bytesize } end @messages_pulled_bytes.observe(common_labels, messages_size) process messages @subscriber.acknowledge messages log.debug "#{messages.length} message(s) processed" rescue Fluent::GcloudPubSub::RetryableError => e @pull_errors.increment(common_labels.merge({ retryable: true })) log.warn "Retryable error occurs. Fluentd will retry.", error_message: e.to_s, error_class: e.class.to_s rescue StandardError => e @pull_errors.increment(common_labels.merge({ retryable: false })) log.error "unexpected error", error_message: e.to_s, error_class: e.class.to_s log.error_backtrace e.backtrace end
common_labels()
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 299 def common_labels { subscription: @subscription } end
dynamic_tag(record)
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 205 def dynamic_tag(record) record.delete(@tag_key) || @tag end
process(messages)
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 262 def process(messages) event_streams = Hash.new do |hsh, key| hsh[key] = Fluent::MultiEventStream.new end messages.each do |m| lines_attributes = Fluent::GcloudPubSub::MessageUnpacker.unpack(m) lines_attributes.each do |line, attributes| @parser.parse(line) do |time, record| if time && record @attribute_keys.each do |key| record[key] = attributes[key] end event_streams[@extract_tag.call(record)].add(time, record) else case @parse_error_action when :exception raise FailedParseError, "pattern not match: #{line}" else log.warn "pattern not match", record: line end end end end end event_streams.each do |tag, es| # There are some output plugins not to supposed to be called with multi-threading. # Maybe remove in the future. @emit_guard.synchronize do router.emit_stream(tag, es) end end end
start_rpc()
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 209 def start_rpc log.info "listening http rpc server on http://#{@rpc_bind}:#{@rpc_port}/" @rpc_srv = WEBrick::HTTPServer.new( { BindAddress: @rpc_bind, Port: @rpc_port, Logger: WEBrick::Log.new(STDERR, WEBrick::Log::FATAL), AccessLog: [], }, ) @rpc_srv.mount("/api/in_gcloud_pubsub/pull/", RPCServlet, self) @rpc_thread = thread_create(:in_gcloud_pubsub_rpc_thread) do @rpc_srv.start end end
static_tag(_record)
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 201 def static_tag(_record) @tag end
subscribe()
click to toggle source
# File lib/fluent/plugin/in_gcloud_pubsub.rb, line 225 def subscribe until @stop_subscribing _subscribe unless @stop_pull sleep @pull_interval if @return_immediately || @stop_pull end rescue StandardError => e log.error "unexpected error", error_message: e.to_s, error_class: e.class.to_s log.error_backtrace e.backtrace end