class Fluent::Plugin::HTTPOutput

Constants

DEFAULT_BUFFER_TYPE
DEFAULT_FORMATTER

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 15
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 61
def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter)
  super

  @ssl_verify_mode = if @ssl_no_verify
                       OpenSSL::SSL::VERIFY_NONE
                     else
                       OpenSSL::SSL::VERIFY_PEER
                     end

  @ca_file = @cacert_file
  @last_request_time = nil
  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if !@chunk_key_tag && @buffered

  http_methods = [:get, :put, :post, :delete]
  @http_method = if http_methods.include? @http_method.intern
                   @http_method.intern
                 else
                   :post
                 end
  puts @endpoint_url
  @uri = URI.parse(@endpoint_url)
  # ssl_verify = @ssl_no_verify
  url = @uri.scheme  + "://" + @uri.host + ":" + @uri.port.to_s
  @adapter = Faraday.new(url: url, ssl: {verify:false} ) do |f|
    f.request :retry, max:                 5,
      interval:            1,
      interval_randomness: 0.5,
      backoff_factor:      2,
      methods:             @http_method,
      exceptions:          %w(Errno::ETIMEDOUT
                              Faraday::TimeoutError
                              Faraday::Error::TimeoutError
                              Net::ReadTimeout).freeze

    f.adapter :net_http_persistent
  end

  if @formatter_config = conf.elements('format').first
    @formatter = formatter_create
  end
end
create_request(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 155
def create_request(tag, time, record)
  url = format_url(tag, time, record)
  uri = URI.parse(url)
  req = Net::HTTP.const_get(@http_method.to_s.capitalize).new(uri.request_uri)
  set_body(req, tag, time, record)
  set_header(req, tag, time, record)
  return req, uri
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 204
def format(tag, time, record)
  [time, record].to_msgpack
end
format_url(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 112
def format_url(tag, time, record)
  @endpoint_url
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 208
def formatted_to_msgpack_binary?
  true
end
handle_record(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 192
def handle_record(tag, time, record)
  if @formatter_config
    record = @formatter.format(tag, time, record)
  end
  # req, uri = create_request(tag, time, record)
  send_request(record)
end
http_opts(uri) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 164
def http_opts(uri)
    opts = {
      :use_ssl => uri.scheme == 'https'
    }
    opts[:verify_mode] = @ssl_verify_mode if opts[:use_ssl]
    opts[:ca_file] = File.join(@ca_file) if File.file?(@ca_file)
    opts
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 212
def multi_workers_ready?
  true
end
prefer_buffered_processing() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 200
def prefer_buffered_processing
  @buffered
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 216
def process(tag, es)
  es.each do |time, record|
    handle_record(tag, time, record)
  end
end
proxies() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 173
def proxies
  ENV['HTTPS_PROXY'] || ENV['HTTP_PROXY'] || ENV['http_proxy'] || ENV['https_proxy']
end
send_request(record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 177
def send_request(record)
  is_rate_limited = (@rate_limit_msec != 0 and not @last_request_time.nil?)
  if is_rate_limited and ((Time.now.to_f - @last_request_time) * 1000.0 < @rate_limit_msec)
    log.info('Dropped request due to rate limiting')
    return
  end

  _ = @adapter.post(@uri.path) do |request|
    request.headers['Content-Type'] = 'application/json'
    request.body = Yajl.dump(record)
    request.options.timeout = 60
    request.options.open_timeout = 60
  end
end
set_body(req, tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 116
def set_body(req, tag, time, record)
  if @serializer == :json
    set_json_body(req, record)
  elsif @serializer == :text
    set_text_body(req, record)
  elsif @serializer == :raw
    set_raw_body(req, record)
  else
    req.set_form_data(record)
  end
  req
end
set_header(req, tag, time, record) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 129
def set_header(req, tag, time, record)
  if @custom_headers
    @custom_headers.each do |k,v|
      req[k] = v
    end
    req
  else
    req
  end
end
set_json_body(req, data) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 140
def set_json_body(req, data)
  req.body = Yajl.dump(data)
  req['Content-Type'] = 'application/json'
end
set_raw_body(req, data) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 150
def set_raw_body(req, data)
  req.body = data.to_s
  req['Content-Type'] = 'application/octet-stream'
end
set_text_body(req, data) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 145
def set_text_body(req, data)
  req.body = data["message"]
  req['Content-Type'] = 'text/plain'
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 108
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 104
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 222
def write(chunk)
  tag = chunk.metadata.tag
  @endpoint_url = extract_placeholders(@endpoint_url, chunk.metadata)
  chunk.msgpack_each do |time, record|
    handle_record(tag, time, record)
  end
end