class Fluent::ReassembleOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_reassemble.rb, line 19 def configure(conf) super unless config.has_key?('output_tag') raise Fluent::ConfigError, "you must set 'output_tag'" end unless config.has_key?('assemble') raise Fluent::ConfigError, "you must set 'assemble'" end if config.has_key?('tz') ENV["TZ"] = config['tz'] end #assemble definition format # {extract_key1}:{replaced_key1}:{operation1},{extract_key2}:{replaced_key2}:{operation2},....{extract_keyN}:{replaced_keyN}:{operationN} @reassemble_conf = [] @assemble.split(",").each{ |conf| extract, replace, operation = conf.split(":") if extract.nil? || extract.empty? next else extract = extract.strip end if replace.nil? || replace.empty? replace = extract else replace = replace.strip end unless operation.nil? || operation.empty? operation = operation.strip end @reassemble_conf.push({:extract => extract, :replace => replace, :operation => operation}) } $log.info "reassemble conf : " + @reassemble_conf.to_s end
convert(val, operation)
click to toggle source
# File lib/fluent/plugin/out_reassemble.rb, line 114 def convert(val, operation) if operation.nil? return val end unless operation.start_with?("fixval_") if val.nil? return val end end o = operation.downcase begin case o when "to_s" return val.to_s when "to_i" return val.to_i when "to_f" return val.to_f when "to_json" return val.to_json when "bool_to_i" if (val.is_a?(TrueClass) || val.is_a?(FalseClass)) if val return 1 else return 0 end else val.to_i end when "unixtime_to_datetime" return Time.at(val.to_i).strftime(@datetime_format) when "unixtime_to_date" return Time.at(val.to_i).strftime(@date_format) when "unixtime_to_time" return Time.at(val.to_i).strftime(@time_format) when "url_to_host","url_to_domain" return URI(val.to_s).host when "url_to_path" return URI(val.to_s).path when /^add_([\d]+)/ num = o.gsub(/^add_([\d]+)/, '\1').to_i return val + num when /^sub_([\d]+)/ num = o.gsub(/^sub_([\d]+)/, '\1').to_i return val - num when /^mul_([\d]+)/ num = o.gsub(/^mul_([\d]+)/, '\1').to_i return val * num when /^div_([\d]+)/ num = o.gsub(/^div_([\d]+)/, '\1').to_i return val / num when /^fixval_(.*)/ return o.gsub(/^fixval_(.*)/, '\1').to_s else return val end rescue $log.warn $! return val end end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_reassemble.rb, line 53 def emit(tag, es, chain) chain.next es.each {|time,record| if @expand_extract_key.nil? json = reassemble(record) Fluent::Engine.emit(@output_tag, time, json) else replaced_key = @expand_replaced_key if @expand_replaced_key.nil? || @expand_replaced_key.empty? replaced_key = @expand_extract_key end operation = @expand_operation traversed = traverse(record, @expand_extract_key) if traversed traversed.each { |r| json = reassemble(record) val = convert(r, operation) if !(val.nil?) json[replaced_key] = val elsif @null_to_null json[replaced_key] = nil elsif @null_to_empty json[replaced_key] = "" end Fluent::Engine.emit(@output_tag, time, json) } end end } end
reassemble(record)
click to toggle source
# File lib/fluent/plugin/out_reassemble.rb, line 84 def reassemble(record) json = {} @reassemble_conf.each { |conf| extract_key = conf[:extract] replaced_key = conf[:replace] operation = conf[:operation] val = convert(traverse(record, extract_key), operation) if !(val.nil?) json[replaced_key] = val elsif @null_to_null json[replaced_key] = nil elsif @null_to_empty json[replaced_key] = "" end } return json end
traverse(data, key)
click to toggle source
# File lib/fluent/plugin/out_reassemble.rb, line 102 def traverse(data, key) val = data key.split('.').each{ |k| if val.is_a?(Hash) && val.has_key?(k) val = val[k] else return nil end } return val end