class Fluent::Plugin::RouteOutput
Attributes
routes[R]
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_route.rb, line 66 def configure(conf) if conf.elements(name: 'store').size > 0 raise Fluent::ConfigError, "<store> section is not available in route plugin" end super @match_cache = {} @routes = [] @route_configs.each do |rc| route_router = event_emitter_router(rc['@label']) modifier = tag_modifier(rc.remove_tag_prefix, rc.add_tag_prefix) @routes << Route.new(rc.pattern, route_router, modifier, rc.copy) end @default_tag_modifier = (@remove_tag_prefix || @add_tag_prefix) ? tag_modifier(@remove_tag_prefix, @add_tag_prefix) : nil @mutex = Mutex.new end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_route.rb, line 106 def process(tag, es) modified_tag, targets = @match_cache[tag] unless targets modified_tag = @default_tag_modifier ? @default_tag_modifier.call(tag) : tag targets = [] @routes.each do |r| if r.match?(modified_tag) targets << r break unless r.copy? end end @mutex.synchronize do if @match_cache.size >= @match_cache_size remove_keys = @match_cache.keys[0...(@match_cache_size / 2)] @match_cache.delete_if{|key, _value| remove_keys.include?(key) } end @match_cache[tag] = [modified_tag, targets] end end case targets.size when 0 # do nothing when 1 targets.first.emit(modified_tag, es) else targets.each do |target| dup_es = if es.respond_to?(:dup) es.dup else m_es = MultiEventStream.new es.each{|t,r| m_es.add(t, r) } m_es end target.emit(modified_tag, dup_es) end end end
tag_modifier(remove_tag_prefix, add_tag_prefix)
click to toggle source
# File lib/fluent/plugin/out_route.rb, line 43 def tag_modifier(remove_tag_prefix, add_tag_prefix) tag_cache_size = @tag_cache_size cache = {} mutex = Mutex.new removed_prefix = remove_tag_prefix ? remove_tag_prefix + "." : "" added_prefix = add_tag_prefix ? add_tag_prefix + "." : "" ->(tag){ if cached = cache[tag] cached else modified = tag.start_with?(removed_prefix) ? tag.sub(removed_prefix, added_prefix) : added_prefix + tag mutex.synchronize do if cache.size >= tag_cache_size remove_keys = cache.keys[0...(tag_cache_size / 2)] cache.delete_if{|key, _value| remove_keys.include?(key) } end cache[tag] = modified end modified end } end