# File lib/fluent/event_router.rb, line 157 def initialize(filters = []) @filters = filters @optimizable = nil end
# File lib/fluent/event_router.rb, line 167 def filter_stream(tag, es) if optimizable? optimized_filter_stream(tag, es) else @filters.reduce(es) { |acc, filter| filter.filter_stream(tag, acc) } end end
# File lib/fluent/event_router.rb, line 162 def filters=(filters) @filters = filters reset_optimization end
# File lib/fluent/event_router.rb, line 222 def filters_having_filter_stream @filters_having_filter_stream ||= @filters.select do |filter| filter.class.instance_methods(false).include?(:filter_stream) end end
# File lib/fluent/event_router.rb, line 208 def optimizable? return @optimizable unless @optimizable.nil? fs_filters = filters_having_filter_stream @optimizable = if fs_filters.empty? true else # skip log message when filter is only 1, because its performace is same as non optimized chain. if @filters.size > 1 && fs_filters.size >= 1 $log.info "disable filter chain optimization because #{fs_filters.map(&:class)} uses `#filter_stream` method." end false end end
# File lib/fluent/event_router.rb, line 177 def optimized_filter_stream(tag, es) new_es = MultiEventStream.new es.each do |time, record| filtered_record = record filtered_time = time catch :break_loop do @filters.each do |filter| if filter.has_filter_with_time begin filtered_time, filtered_record = filter.filter_with_time(tag, filtered_time, filtered_record) throw :break_loop unless filtered_record && filtered_time rescue => e filter.router.emit_error_event(tag, filtered_time, filtered_record, e) end else begin filtered_record = filter.filter(tag, filtered_time, filtered_record) throw :break_loop unless filtered_record rescue => e filter.router.emit_error_event(tag, filtered_time, filtered_record, e) end end end new_es.add(filtered_time, filtered_record) end end new_es end
# File lib/fluent/event_router.rb, line 228 def reset_optimization @optimizable = nil @filters_having_filter_stream = nil end