class Fluent::HTTPOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http_ext.rb, line 67
def initialize
  super
  require 'net/http'
  require 'uri'
  require 'yajl'
  require 'set'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http_ext.rb, line 108
def configure(conf)
  super

  serializers = [:json, :form]
  @serializer = if serializers.include? @serializer.intern
                  @serializer.intern
                else
                  :form
                end

  http_methods = [:get, :put, :post, :delete]
  @http_method = if http_methods.include? @http_method.intern
                  @http_method.intern
                else
                  :post
                end

  @ignore_http_status_code = if @ignore_http_status_code.nil?
                        [].to_set
                      else
                        StatusCodeParser.convert(@ignore_http_status_code)
                      end

  @auth = case @authentication
          when 'basic' then :basic
          else
            :none
          end
  @headers = {}
  conf.elements.each do |element|
    if element.name == 'headers'
      @headers = element.to_hash
    end
  end

  @formatter = nil
  unless @format.empty?
    @formatter = Fluent::Plugin.new_formatter(@format)
    @formatter.configure(conf)
  end
end
create_request(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http_ext.rb, line 194
def create_request(tag, time, record)
  url = format_url(tag, time, record)
  uri = URI.parse(url)
  req = Net::HTTP.const_get(@http_method.to_s.capitalize).new(uri.path)
  set_body(req, tag, time, record)
  set_header(req, tag, time, record)
  return req, uri
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_http_ext.rb, line 260
def emit(tag, es, chain)
  es.each do |time, record|
    if @formatter
      record = @formatter.format(tag, time, record)
    end
    handle_record(tag, time, record)
  end
  chain.next
end
format_url(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http_ext.rb, line 158
def format_url(tag, time, record)
  '''
  replace format string to value
  example
    /test/<data> =(use {data: 1})> /test/1
    /test/<hash.data> =(use {hash:{data:2}})> /test/2
  '''
  result_url = @endpoint_url
  return result_url unless record.is_a? Hash
  record.each_deep do |key_dir, value|
    result_url = result_url.gsub(/<#{key_dir.join(".")}>/, value.to_s)
  end
  return result_url
end
handle_record(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http_ext.rb, line 255
def handle_record(tag, time, record)
  req, uri = create_request(tag, time, record)
  send_request(req, uri)
end
send_request(req, uri) click to toggle source
# File lib/fluent/plugin/out_http_ext.rb, line 203
def send_request(req, uri)
  is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?)
  if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec)
    $log.info('Dropped request due to rate limiting')
    return
  end

  res = nil

  begin
    if @auth and @auth == :basic
      req.basic_auth(@username, @password)
    end
    @last_request_time = Time.now.to_f
    client = Net::HTTP.new(uri.host, uri.port)
    if @use_ssl
      client.use_ssl = true
      client.ca_file = OpenSSL::X509::DEFAULT_CERT_FILE
      unless @verify_ssl
        client.verify_mode = OpenSSL::SSL::VERIFY_NONE
      end
    end
    res = client.start {|http|
      http.open_timeout = @open_timeout
      http.read_timeout = @read_timeout
      http.request(req)
    }
  rescue => e # rescue all StandardErrors
    # server didn't respond
    $log.warn "Net::HTTP.#{req.method.capitalize} raises exception: #{e.class}, '#{e.message}'"
    raise e if @raise_on_error
  else
     unless res and res.is_a?(Net::HTTPSuccess)
        res_summary = if res
                         "#{res.code} #{res.message} #{res.body}"
                      else
                         "res=nil"
                      end
        warning = "failed to #{req.method} #{uri} (#{res_summary})"
        $log.warn warning
        if @raise_on_http_failure
          unless @ignore_http_status_code.include?(res.code.to_i)
            raise warning
          else
            $log.debug "ignore http status code #{req.method}"
          end
        end

     end #end unless
  end # end begin
end
set_body(req, tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http_ext.rb, line 173
def set_body(req, tag, time, record)
  if @serializer == :json
    set_json_body(req, record)
  else
    req.set_form_data(record)
  end
  req
end
set_header(req, tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http_ext.rb, line 182
def set_header(req, tag, time, record)
  @headers.each do |key, value|
    req[key] = value
  end
  req
end
set_json_body(req, data) click to toggle source
# File lib/fluent/plugin/out_http_ext.rb, line 189
def set_json_body(req, data)
  req.body = Yajl.dump(data)
  req['Content-Type'] = 'application/json'
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http_ext.rb, line 154
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http_ext.rb, line 150
def start
  super
end