class Fluent::HTTPOutput

The out_http buffered output plugin sends event records via HTTP.

Constants

HTTP_STATUS_CODE_RANGE
ResponseError

Unsuccessful response error

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 32
def initialize
  require 'fluent/plugin/http/error'

  super
end

Public Instance Methods

configure(conf) click to toggle source

Configures the plugin

@param conf [Hash] the plugin configuration @return void

Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 42
def configure(conf)
  super

  @url = validate_url(url)
  @accept_status_code = validate_accept_status_code(accept_status_code)
  @authorization_token = validate_authorization_token(authorization_token)
  @keep_alive_timeout = validate_keep_alive_timeout(keep_alive_timeout)
  @username = validate_username(username)
  @password = validate_password(password)
end
format(tag, time, record) click to toggle source

Serializes the event

@param tag [#to_msgpack] the event tag @param time [#to_msgpack] the event timestamp @param record [#to_msgpack] the event record @return [String] serialized event

# File lib/fluent/plugin/out_http.rb, line 68
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
shutdown() click to toggle source

Hook method that is called at the shutdown

@return void

Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 56
def shutdown
  super

  disconnect
end
write(chunk) click to toggle source

Sends the event records

@param chunk [#msgpack_each] buffer chunk that includes multiple

formatted events

@return void

# File lib/fluent/plugin/out_http.rb, line 77
def write(chunk)
  return if chunk.empty?

  records = []

  chunk.msgpack_each do |tag_time_record|
    records << (_record = tag_time_record.last)
  end

  post_records = post_records_request(records)
  response = connect.request(post_records)

  return if accept_status_code.include?(response.code)

  raise ResponseError.error(post_records, response)
end

Private Instance Methods

connect() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 96
def connect
  @http ||= Net::HTTP.start(
    url.host,
    url.port,
    use_ssl: url.scheme == 'https',
    keep_alive_timeout: keep_alive_timeout
  )
end
disconnect() click to toggle source
# File lib/fluent/plugin/out_http.rb, line 105
def disconnect
  return unless defined?(@http)
  return unless @http

  @http.finish
end
http_status_code?(code) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 148
def http_status_code?(code)
  HTTP_STATUS_CODE_RANGE.cover?(code.to_i)
end
post_records_request(records) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 112
def post_records_request(records)
  Net::HTTP::Post.new(url).tap do |request|
    request.body = Oj.dump(records)

    request.content_type = 'application/json'
    request['User-Agent'] = 'FluentPluginHTTP'

    if authorization_token
      request['Authorization'] = "Token token=#{authorization_token}"
    end

    request.basic_auth(username, password) if username
  end
end
validate_accept_status_code(status_codes) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 137
def validate_accept_status_code(status_codes)
  if !status_codes.empty? && status_codes.all?(&method(:http_status_code?))
    return status_codes
  end

  raise Fluent::ConfigError, "Invalid status codes: #{status_codes.inspect}"
end
validate_authorization_token(value) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 152
def validate_authorization_token(value)
  return value if value.nil?
  return value unless value.empty?

  raise Fluent::ConfigError, "Invalid authorization token: #{value.inspect}"
end
validate_keep_alive_timeout(value) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 159
def validate_keep_alive_timeout(value)
  return value if value >= 0

  raise Fluent::ConfigError, "Invalid keep-alive timeout: #{value.inspect}"
end
validate_password(value) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 178
def validate_password(value)
  return value if value.nil? || username

  raise Fluent::ConfigError, 'Password requires a username'
end
validate_url(test_url) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 127
def validate_url(test_url)
  url = URI(test_url)
  return url if url.scheme == 'http' || url.scheme == 'https'

  raise Fluent::ConfigError,
        "Unacceptable URL scheme, expected HTTP or HTTPs: #{test_url}"
rescue URI::InvalidURIError => e
  raise Fluent::ConfigError, e
end
validate_username(value) click to toggle source
# File lib/fluent/plugin/out_http.rb, line 165
def validate_username(value)
  return value if value.nil?

  if authorization_token
    raise Fluent::ConfigError,
          'Mutually exclusive: authorization_token and username'
  end

  return value unless value.empty?

  raise Fluent::ConfigError, "Invalid username: #{value.inspect}"
end