class Fluent::HTTPOutput
The out_http buffered output plugin sends event records via HTTP.
Constants
- HTTP_STATUS_CODE_RANGE
- ResponseError
Unsuccessful response error
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 32 def initialize require 'fluent/plugin/http/error' super end
Public Instance Methods
configure(conf)
click to toggle source
Configures the plugin
@param conf [Hash] the plugin configuration @return void
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 42 def configure(conf) super @url = validate_url(url) @accept_status_code = validate_accept_status_code(accept_status_code) @authorization_token = validate_authorization_token(authorization_token) @keep_alive_timeout = validate_keep_alive_timeout(keep_alive_timeout) @username = validate_username(username) @password = validate_password(password) end
format(tag, time, record)
click to toggle source
Serializes the event
@param tag [#to_msgpack] the event tag @param time [#to_msgpack] the event timestamp @param record [#to_msgpack] the event record @return [String] serialized event
# File lib/fluent/plugin/out_http.rb, line 68 def format(tag, time, record) [tag, time, record].to_msgpack end
shutdown()
click to toggle source
Hook method that is called at the shutdown
@return void
Calls superclass method
# File lib/fluent/plugin/out_http.rb, line 56 def shutdown super disconnect end
write(chunk)
click to toggle source
Sends the event records
@param chunk [#msgpack_each] buffer chunk that includes multiple
formatted events
@return void
# File lib/fluent/plugin/out_http.rb, line 77 def write(chunk) return if chunk.empty? records = [] chunk.msgpack_each do |tag_time_record| records << (_record = tag_time_record.last) end post_records = post_records_request(records) response = connect.request(post_records) return if accept_status_code.include?(response.code) raise ResponseError.error(post_records, response) end
Private Instance Methods
connect()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 96 def connect @http ||= Net::HTTP.start( url.host, url.port, use_ssl: url.scheme == 'https', keep_alive_timeout: keep_alive_timeout ) end
disconnect()
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 105 def disconnect return unless defined?(@http) return unless @http @http.finish end
http_status_code?(code)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 148 def http_status_code?(code) HTTP_STATUS_CODE_RANGE.cover?(code.to_i) end
post_records_request(records)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 112 def post_records_request(records) Net::HTTP::Post.new(url).tap do |request| request.body = Oj.dump(records) request.content_type = 'application/json' request['User-Agent'] = 'FluentPluginHTTP' if authorization_token request['Authorization'] = "Token token=#{authorization_token}" end request.basic_auth(username, password) if username end end
validate_accept_status_code(status_codes)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 137 def validate_accept_status_code(status_codes) if !status_codes.empty? && status_codes.all?(&method(:http_status_code?)) return status_codes end raise Fluent::ConfigError, "Invalid status codes: #{status_codes.inspect}" end
validate_keep_alive_timeout(value)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 159 def validate_keep_alive_timeout(value) return value if value >= 0 raise Fluent::ConfigError, "Invalid keep-alive timeout: #{value.inspect}" end
validate_password(value)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 178 def validate_password(value) return value if value.nil? || username raise Fluent::ConfigError, 'Password requires a username' end
validate_url(test_url)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 127 def validate_url(test_url) url = URI(test_url) return url if url.scheme == 'http' || url.scheme == 'https' raise Fluent::ConfigError, "Unacceptable URL scheme, expected HTTP or HTTPs: #{test_url}" rescue URI::InvalidURIError => e raise Fluent::ConfigError, e end
validate_username(value)
click to toggle source
# File lib/fluent/plugin/out_http.rb, line 165 def validate_username(value) return value if value.nil? if authorization_token raise Fluent::ConfigError, 'Mutually exclusive: authorization_token and username' end return value unless value.empty? raise Fluent::ConfigError, "Invalid username: #{value.inspect}" end