class Fluent::SortOutput

Public Instance Methods

format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_sort.rb, line 36
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_sort.rb, line 40
def write(chunk)
  messages = sort_messages(chunk.to_enum(:msgpack_each))
  messages.each do |tag, time, record|
    Engine.emit(tag, time, record)
  end
end

Private Instance Methods

extract_sort_value(message) click to toggle source
# File lib/fluent/plugin/out_sort.rb, line 91
def extract_sort_value(message)
  _, _, record = message
  @sort_key.reduce(record) do |current_item, key|
    case current_item
    when Hash
      current_item[key]
    else
      nil
    end
  end
end
sort_by_attribute(messages) click to toggle source
# File lib/fluent/plugin/out_sort.rb, line 63
def sort_by_attribute(messages)
  value_cache = {}
  messages.sort do |message1, message2|
    if value_cache.key?(message1)
      value1 = value_cache[message1]
    else
      value1 = value_cache[message1] = extract_sort_value(message1)
    end
    if value_cache.key?(message2)
      value2 = value_cache[message2]
    else
      value2 = value_cache[message2] = extract_sort_value(message2)
    end

    if value1 == value2
      0
    elsif value1.nil? or value2.nil?
      if value1.nil?
        -1
      else
        1
      end
    else
      value1 <=> value2
    end
  end
end
sort_by_time(messages) click to toggle source
# File lib/fluent/plugin/out_sort.rb, line 57
def sort_by_time(messages)
  messages.sort_by do |tag, time, record|
    time
  end
end
sort_messages(messages) click to toggle source
# File lib/fluent/plugin/out_sort.rb, line 48
def sort_messages(messages)
  case @sort_key
  when :time
    sort_by_time(messages)
  when Array
    sort_by_attribute(messages)
  end
end