class Fluent::Plugin::HTTPOutput

Constants

DEFAULT_BUFFER_TYPE
DEFAULT_FORMATTER

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 19
def initialize
  super
end

Public Instance Methods

bulk_request_format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 280
def bulk_request_format(tag, time, record)
  @formatter.format(tag, time, record)
end
compress_body(req, data) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 149
def compress_body(req, data)
  return unless @compress_request
  gz = Zlib::GzipWriter.new(StringIO.new)
  gz << data

  req['Content-Encoding'] = "gzip"
  req.body = gz.close.string
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 80
def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter)
  super

  @ssl_verify_mode = if @ssl_no_verify
                       OpenSSL::SSL::VERIFY_NONE
                     else
                       OpenSSL::SSL::VERIFY_PEER
                     end

  @ca_file = @cacert_file
  @last_request_time = nil
  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if !@chunk_key_tag && @buffered

  if @formatter_config = conf.elements('format').first
    @formatter = formatter_create
  end

  if @bulk_request
    class << self
      alias_method :format, :bulk_request_format
    end
    @formatter = formatter_create(type: :json)
    @serializer = :x_ndjson # secret settings for bulk_request
  else
    class << self
      alias_method :format, :split_request_format
    end
  end
end
create_request(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 182
def create_request(tag, time, record)
  url = format_url(tag, time, record)
  uri = URI.parse(url)
  req = Net::HTTP.const_get(@http_method.to_s.capitalize).new(uri.request_uri)
  set_body(req, tag, time, record)
  set_header(req, tag, time, record)
  return req, uri
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 272
def format(tag, time, record)
  # For safety.
end
format_url(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 119
def format_url(tag, time, record)
  @endpoint_url
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 284
def formatted_to_msgpack_binary?
  if @bulk_request
    false
  else
    true
  end
end
handle_record(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 255
def handle_record(tag, time, record)
  if @formatter_config
    record = @formatter.format(tag, time, record)
  end
  req, uri = create_request(tag, time, record)
  send_request(req, uri)
end
handle_records(tag, time, chunk) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 263
def handle_records(tag, time, chunk)
  req, uri = create_request(tag, time, chunk.read)
  send_request(req, uri)
end
http_opts(uri) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 191
def http_opts(uri)
    opts = {
      :use_ssl => uri.scheme == 'https'
    }
    opts[:verify_mode] = @ssl_verify_mode if opts[:use_ssl]
    opts[:ca_file] = File.join(@ca_file) if File.file?(@ca_file)
    opts[:cert] = OpenSSL::X509::Certificate.new(File.read(@client_cert_path)) if File.file?(@client_cert_path)
    opts[:key] = OpenSSL::PKey.read(File.read(@private_key_path), @private_key_passphrase) if File.file?(@private_key_path)
    opts
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 292
def multi_workers_ready?
  true
end
prefer_buffered_processing() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 268
def prefer_buffered_processing
  @buffered
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 296
def process(tag, es)
  es.each do |time, record|
    handle_record(tag, time, record)
  end
end
proxies() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 202
def proxies
  ENV['HTTPS_PROXY'] || ENV['HTTP_PROXY'] || ENV['http_proxy'] || ENV['https_proxy']
end
send_request(req, uri) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 206
def send_request(req, uri)
  is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?)
  if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec)
    log.info('Dropped request due to rate limiting')
    return
  end

  res = nil

  begin
    if @authentication == :basic
      req.basic_auth(@username, @password)
    elsif @authentication == :bearer
      req['authorization'] = "bearer #{@token}"
    elsif @authentication == :jwt
      req['authorization'] = "jwt #{@token}"
    end
    @last_request_time = Time.now.to_f

    if proxy = proxies
      proxy_uri = URI.parse(proxy)

      res = Net::HTTP.start(uri.host, uri.port,
                            proxy_uri.host, proxy_uri.port, proxy_uri.user, proxy_uri.password,
                            **http_opts(uri)) {|http| http.request(req) }
    else
      res = Net::HTTP.start(uri.host, uri.port, **http_opts(uri)) {|http| http.request(req) }
    end

  rescue => e # rescue all StandardErrors
    # server didn't respond
    log.warn "Net::HTTP.#{req.method.capitalize} raises exception: #{e.class}, '#{e.message}'"
    raise e if @raise_on_error
  else
     unless res and res.is_a?(Net::HTTPSuccess)
        res_summary = if res
                         "#{res.code} #{res.message} #{res.body}"
                      else
                         "res=nil"
                      end
        if @recoverable_status_codes.include?(res.code.to_i)
          raise RecoverableResponse, res_summary
        else
          log.warn "failed to #{req.method} #{uri} (#{res_summary})"
        end
     end #end unless
  end # end begin
end
set_body(req, tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 123
def set_body(req, tag, time, record)
  if @serializer == :json
    set_json_body(req, record)
  elsif @serializer == :text
    set_text_body(req, record)
  elsif @serializer == :raw
    set_raw_body(req, record)
  elsif @serializer == :x_ndjson
    set_bulk_body(req, record)
  else
    req.set_form_data(record)
  end
  req
end
set_bulk_body(req, data) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 176
def set_bulk_body(req, data)
  req.body = data.to_s
  req['Content-Type'] = 'application/x-ndjson'
  compress_body(req, req.body)
end
set_header(req, tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 138
def set_header(req, tag, time, record)
  if @custom_headers
    @custom_headers.each do |k,v|
      req[k] = v
    end
    req
  else
    req
  end
end
set_json_body(req, data) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 158
def set_json_body(req, data)
  req.body = Yajl.dump(data)
  req['Content-Type'] = 'application/json'
  compress_body(req, req.body)
end
set_raw_body(req, data) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 170
def set_raw_body(req, data)
  req.body = data.to_s
  req['Content-Type'] = 'application/octet-stream'
  compress_body(req, req.body)
end
set_text_body(req, data) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 164
def set_text_body(req, data)
  req.body = data["message"]
  req['Content-Type'] = 'text/plain'
  compress_body(req, req.body)
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 115
def shutdown
  super
end
split_request_format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 276
def split_request_format(tag, time, record)
  [time, record].to_msgpack
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 111
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 302
def write(chunk)
  tag = chunk.metadata.tag
  @endpoint_url = extract_placeholders(@endpoint_url, chunk)
  if @bulk_request
    time = Fluent::Engine.now
    handle_records(tag, time, chunk)
  else
    chunk.msgpack_each do |time, record|
      handle_record(tag, time, record)
    end
  end
end