class Fluent::SortOutput
Public Instance Methods
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_sort.rb, line 36 def format(tag, time, record) [tag, time, record].to_msgpack end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_sort.rb, line 40 def write(chunk) messages = sort_messages(chunk.to_enum(:msgpack_each)) messages.each do |tag, time, record| Engine.emit(tag, time, record) end end
Private Instance Methods
extract_sort_value(message)
click to toggle source
# File lib/fluent/plugin/out_sort.rb, line 91 def extract_sort_value(message) _, _, record = message @sort_key.reduce(record) do |current_item, key| case current_item when Hash current_item[key] else nil end end end
sort_by_attribute(messages)
click to toggle source
# File lib/fluent/plugin/out_sort.rb, line 63 def sort_by_attribute(messages) value_cache = {} messages.sort do |message1, message2| if value_cache.key?(message1) value1 = value_cache[message1] else value1 = value_cache[message1] = extract_sort_value(message1) end if value_cache.key?(message2) value2 = value_cache[message2] else value2 = value_cache[message2] = extract_sort_value(message2) end if value1 == value2 0 elsif value1.nil? or value2.nil? if value1.nil? -1 else 1 end else value1 <=> value2 end end end
sort_by_time(messages)
click to toggle source
# File lib/fluent/plugin/out_sort.rb, line 57 def sort_by_time(messages) messages.sort_by do |tag, time, record| time end end
sort_messages(messages)
click to toggle source
# File lib/fluent/plugin/out_sort.rb, line 48 def sort_messages(messages) case @sort_key when :time sort_by_time(messages) when Array sort_by_attribute(messages) end end