class Fluent::Plugin::RouteOutput

Attributes

routes[R]

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_route.rb, line 66
def configure(conf)
  if conf.elements(name: 'store').size > 0
    raise Fluent::ConfigError, "<store> section is not available in route plugin"
  end

  super

  @match_cache = {}
  @routes = []
  @route_configs.each do |rc|
    route_router = event_emitter_router(rc['@label'])
    modifier = tag_modifier(rc.remove_tag_prefix, rc.add_tag_prefix)
    @routes << Route.new(rc.pattern, route_router, modifier, rc.copy)
  end
  @default_tag_modifier = (@remove_tag_prefix || @add_tag_prefix) ? tag_modifier(@remove_tag_prefix, @add_tag_prefix) : nil
  @mutex = Mutex.new
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_route.rb, line 106
def process(tag, es)
  modified_tag, targets = @match_cache[tag]
  unless targets
    modified_tag = @default_tag_modifier ? @default_tag_modifier.call(tag) : tag
    targets = []
    @routes.each do |r|
      if r.match?(modified_tag)
        targets << r
        break unless r.copy?
      end
    end

    @mutex.synchronize do
      if @match_cache.size >= @match_cache_size
        remove_keys = @match_cache.keys[0...(@match_cache_size / 2)]
        @match_cache.delete_if{|key, _value| remove_keys.include?(key) }
      end
      @match_cache[tag] = [modified_tag, targets]
    end
  end

  case targets.size
  when 0
    # do nothing
  when 1
    targets.first.emit(modified_tag, es)
  else
    targets.each do |target|
      dup_es = if es.respond_to?(:dup)
                 es.dup
               else
                 m_es = MultiEventStream.new
                 es.each{|t,r| m_es.add(t, r) }
                 m_es
               end
      target.emit(modified_tag, dup_es)
    end
  end
end
tag_modifier(remove_tag_prefix, add_tag_prefix) click to toggle source
# File lib/fluent/plugin/out_route.rb, line 43
def tag_modifier(remove_tag_prefix, add_tag_prefix)
  tag_cache_size = @tag_cache_size
  cache = {}
  mutex = Mutex.new
  removed_prefix = remove_tag_prefix ? remove_tag_prefix + "." : ""
  added_prefix = add_tag_prefix ? add_tag_prefix + "." : ""
  ->(tag){
    if cached = cache[tag]
      cached
    else
      modified = tag.start_with?(removed_prefix) ? tag.sub(removed_prefix, added_prefix) : added_prefix + tag
      mutex.synchronize do
        if cache.size >= tag_cache_size
          remove_keys = cache.keys[0...(tag_cache_size / 2)]
          cache.delete_if{|key, _value| remove_keys.include?(key) }
        end
        cache[tag] = modified
      end
      modified
    end
  }
end