class Fluent::HttpRecordModifier

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_http_record_modifier.rb, line 5
def initialize
  require 'socket'
  require 'yajl'
  require 'net/http'
  require 'uri'
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_http_record_modifier.rb, line 30
def configure(conf)
  super
  @method ||= conf['method']
  @map = {}
  # <record></record> directive
  conf.elements.select { |element| element.name == 'record' }.each do |element|
    element.each_pair do |k, v|
      element.has_key?(k) # to suppress unread configuration warning
      @map[k] = parse_value(v)
    end
  end

  @maped_params = {}
  # <params></params> directive
  conf.elements.select { |element| element.name == 'params' }.each do |element|
    element.each_pair do |k, v|
      element.has_key?(k) # to suppress unread configuration warning
      @maped_params[k] = parse_value(v)
    end
  end

  if @remove_keys
    @remove_keys = @remove_keys.split(',')
  end

  if @keep_keys
    raise Fluent::ConfigError, "`renew_record` must be true to use `keep_keys`" unless @renew_record
    @keep_keys = @keep_keys.split(',')
  end

  @request_cache = Cache.new(@cache, @expire)

  @placeholder_expander = PlaceholderExpander.new({
    :log           => log,
    :auto_typecast => @auto_typecast,
  })

  @hostname = Socket.gethostname
end
create_request(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_http_record_modifier.rb, line 147
def create_request(tag, time, record)
  url = URI.encode(@endpoint_url.to_s)
  uri = URI.parse(url)
  params = format_params(tag, time, record)
  uri.query = URI.encode_www_form(params)
  req = Net::HTTP.const_get(@method.to_s.capitalize).new(uri)
  unless @method.to_s.capitalize == 'Get'
    set_body(req, tag, time, record)
  end
  return req, uri
end
deserialize_body(res) click to toggle source
# File lib/fluent/plugin/filter_http_record_modifier.rb, line 108
def deserialize_body(res)
  clean = {}
  body = res.body
  if res.content_type == 'application/json'
    body = Yajl.load(body)
    if body.is_a? Hash
      clean = Yajl.load(res.body)
    end
  end
  clean['body'] = body
  clean
end
filter_stream(tag, es) click to toggle source
# File lib/fluent/plugin/filter_http_record_modifier.rb, line 70
def filter_stream(tag, es)
  new_es = MultiEventStream.new
  tag_parts = tag.split('.')
  tag_prefix = tag_prefix(tag_parts)
  tag_suffix = tag_suffix(tag_parts)
  placeholders = {
    'tag' => tag,
    'tag_parts' => tag_parts,
    'tag_prefix' => tag_prefix,
    'tag_suffix' => tag_suffix,
    'hostname' => @hostname,
  }
  last_record = nil
  es.each do |time, record|
    last_record = record # for debug log
    req, uri = create_request(tag, time, record)
    body = @request_cache.get(uri.to_s)
    if body.nil?
      res = send_request(req, uri)
      body = deserialize_body(res)
      @request_cache.set(uri.to_s, body)
    end
    new_record = reform(time, record, placeholders, body)
    if @renew_time_key && new_record.has_key?(@renew_time_key)
      time = new_record[@renew_time_key].to_i
      if @remove_time_key
       new_record.delete(@renew_time_key) 
      end
    end
    new_es.add(time, new_record)
  end
  new_es
rescue => e
  log.warn "failed to reform records", :error_class => e.class, :error => e.message
  log.warn_backtrace
  log.debug "map:#{@map} record:#{last_record} placeholders:#{placeholders}"
end
format_params(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_http_record_modifier.rb, line 121
def format_params(tag, time, record)
  tag_parts = tag.split('.')
  tag_prefix = tag_prefix(tag_parts)
  tag_suffix = tag_suffix(tag_parts)
  placeholders = {
    'tag' => tag,
    'tag_parts' => tag_parts,
    'tag_prefix' => tag_prefix,
    'tag_suffix' => tag_suffix,
    'hostname' => @hostname,
  }
  @placeholder_expander.prepare_placeholders(time, record, placeholders)

  expand_placeholders(@maped_params)
end
send_request(req, uri) click to toggle source
# File lib/fluent/plugin/filter_http_record_modifier.rb, line 159
def send_request(req, uri)    
  res = nil

  begin
    if @auth and @auth == :basic
      req.basic_auth(@username, @password)
    end
    res = Net::HTTP.new(uri.host, uri.port).start {|http| http.request(req) }
  rescue => e # rescue all StandardErrors
    # server didn't respond
    $log.warn "Net::HTTP.#{req.method.capitalize} raises exception: #{e.class}, '#{e.message}'"
    raise e if @raise_on_error
  end # end begin
end
set_body(req, tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_http_record_modifier.rb, line 137
def set_body(req, tag, time, record)
  if @serializer == :json
    req['Content-Type'] = 'application/json'
  else
    req.set_form_data(record)
  end
  req
end

Private Instance Methods

expand_placeholders(value) click to toggle source
# File lib/fluent/plugin/filter_http_record_modifier.rb, line 198
def expand_placeholders(value)
  if value.is_a?(String)
    new_value = @placeholder_expander.expand(value)
  elsif value.is_a?(Hash)
    new_value = {}
    value.each_pair do |k, v|
      new_value[@placeholder_expander.expand(k, true)] = expand_placeholders(v)
    end
  elsif value.is_a?(Array)
    new_value = []
    value.each_with_index do |v, i|
      new_value[i] = expand_placeholders(v)
    end
  else
    new_value = value
  end
  new_value
end
parse_value(value_str) click to toggle source
# File lib/fluent/plugin/filter_http_record_modifier.rb, line 176
def parse_value(value_str)
  if value_str.start_with?('{', '[')
    JSON.parse(value_str)
  else
    value_str
  end
rescue => e
  log.warn "failed to parse #{value_str} as json. Assuming #{value_str} is a string", :error_class => e.class, :error => e.message
  value_str # emit as string
end
reform(time, record, opts, res) click to toggle source
# File lib/fluent/plugin/filter_http_record_modifier.rb, line 187
def reform(time, record, opts, res)
  @placeholder_expander.prepare_placeholders(time, res, opts)

  new_record = @renew_record ? {} : record.dup
  @keep_keys.each {|k| new_record[k] = record[k]} if @keep_keys and @renew_record
  new_record.merge!(expand_placeholders(@map))
  @remove_keys.each {|k| new_record.delete(k) } if @remove_keys

  new_record
end
tag_prefix(tag_parts) click to toggle source
# File lib/fluent/plugin/filter_http_record_modifier.rb, line 217
def tag_prefix(tag_parts)
  return [] if tag_parts.empty?
  tag_prefix = [tag_parts.first]
  1.upto(tag_parts.size-1).each do |i|
    tag_prefix[i] = "#{tag_prefix[i-1]}.#{tag_parts[i]}"
  end
  tag_prefix
end
tag_suffix(tag_parts) click to toggle source
# File lib/fluent/plugin/filter_http_record_modifier.rb, line 226
def tag_suffix(tag_parts)
  return [] if tag_parts.empty?
  rev_tag_parts = tag_parts.reverse
  rev_tag_suffix = [rev_tag_parts.first]
  1.upto(tag_parts.size-1).each do |i|
    rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i-1]}"
  end
  rev_tag_suffix.reverse!
end