class Fluent::Plugin::LabelRouterOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_label_router.rb, line 201 def configure(conf) super @registry = (::Prometheus::Client.registry if @metrics) @route_map = Hash.new { |h, k| h[k] = Set.new } @mutex = Mutex.new @routers = [] @default_router = nil @routes.each do |rule| route_router = event_emitter_router(rule['@label']) @routers << Route.new(rule, route_router, @registry) end if @default_route != '' or @default_tag != '' default_rule = { 'matches' => nil, 'tag' => @default_tag, '@label' => @default_route} @default_router = Route.new(default_rule, event_emitter_router(@default_route), @registry) end @access_to_labels = record_accessor_create("$.kubernetes.labels") @access_to_namespace = record_accessor_create("$.kubernetes.namespace_name") @access_to_host = record_accessor_create("$.kubernetes.host") @access_to_container_name = record_accessor_create("$.kubernetes.container_name") @batch = @emit_mode == :batch end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_label_router.rb, line 147 def process(tag, es) if @sticky_tags @mutex.synchronize { if @route_map.has_key?(tag) # We already matched with this tag send events to the routers @route_map[tag].each do |r| r.emit_es(tag, es.dup) end return end } end event_stream = Hash.new {|h, k| h[k] = Fluent::MultiEventStream.new } es.each do |time, record| input_metadata = { labels: @access_to_labels.call(record).to_h, namespace: @access_to_namespace.call(record).to_s, container: @access_to_container_name.call(record).to_s, host: @access_to_host.call(record).to_s} orphan_record = true @routers.each do |r| if r.match?(input_metadata) orphan_record = false if @sticky_tags @mutex.synchronize { @route_map[tag].add(r) } end if @batch event_stream[r].add(time, record) else r.emit(tag, time, record.dup) end end end if !@default_router.nil? && orphan_record if @sticky_tags @mutex.synchronize { @route_map[tag].add(@default_router) } end if @batch event_stream[@default_router].add(time, record) else @default_router.emit(tag, time, record.dup) end end end if @batch event_stream.each do |r, es| r.emit_es(tag, es.dup) end end end