class Fluent::SplunkHTTPEventcollectorOutput
Public Class Methods
Called on class load (class initializer)
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 70 def initialize super log.trace "splunk-http-eventcollector(initialize) called" require 'net/http/persistent' require 'openssl' end
Thanks to github.com/kazegusuri/fluent-plugin-prometheus/blob/348c112d/lib/fluent/plugin/prometheus.rb
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 79 def self.placeholder_expander(log) # Use internal class in order to expand placeholder if defined?(Fluent::Filter) # for v0.12, built-in PlaceholderExpander begin require 'fluent/plugin/filter_record_transformer' if defined?(Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander) # for v0.14 return Fluent::Plugin::RecordTransformerFilter::PlaceholderExpander.new(log: log) else # for v0.12 return Fluent::RecordTransformerFilter::PlaceholderExpander.new(log: log) end rescue LoadError => e raise ConfigError, "cannot find filter_record_transformer plugin: #{e.message}" end else # for v0.10, use PlaceholderExapander in fluent-plugin-record-reformer plugin begin require 'fluent/plugin/out_record_reformer.rb' return Fluent::RecordReformerOutput::PlaceholderExpander.new(log: log) rescue LoadError => e raise ConfigError, "cannot find fluent-plugin-record-reformer: #{e.message}" end end end
Public Instance Methods
This method is called before starting. 'conf' is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 107 def configure(conf) super log.trace "splunk-http-eventcollector(configure) called" begin @splunk_uri = URI "#{@protocol}://#{@server}/services/collector" rescue raise ConfigError, "Unable to parse the server into a URI." end @placeholder_expander = Fluent::SplunkHTTPEventcollectorOutput.placeholder_expander(log) @hostname = Socket.gethostname # TODO Add other robust input/syntax checks. end
Encode as UTF-8. If 'coerce_to_utf8' is set to true in the config, any non-UTF-8 character would be replaced by the string specified by 'non_utf8_replacement_string'. If 'coerce_to_utf8' is set to false, any non-UTF-8 character would trigger the plugin to error out. Thanks to github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/dbc28575/lib/fluent/plugin/out_google_cloud.rb#L1284
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 293 def convert_to_utf8(input) if input.is_a?(Hash) record = {} input.each do |key, value| record[convert_to_utf8(key)] = convert_to_utf8(value) end return record end return input.map { |value| convert_to_utf8(value) } if input.is_a?(Array) return input unless input.respond_to?(:encode) if @coerce_to_utf8 input.encode( 'utf-8', invalid: :replace, undef: :replace, replace: @non_utf8_replacement_string) else begin input.encode('utf-8') rescue EncodingError @log.error 'Encountered encoding issues potentially due to non ' \ 'UTF-8 characters. To allow non-UTF-8 characters and ' \ 'replace them with spaces, please set "coerce_to_utf8" ' \ 'to true.' raise end end end
This method is called when an event reaches to Fluentd. (like unbuffered emit()) Convert the event to a raw string.
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 147 def format(tag, time, record) #log.trace "splunk-http-eventcollector(format) called" # Basic object for Splunk. Note explicit type-casting to avoid accidental errors. placeholder_values = { 'tag' => tag, 'tag_parts' => tag.split('.'), 'hostname' => @hostname, 'record' => record } placeholders = @placeholder_expander.prepare_placeholders(placeholder_values) splunk_object = Hash[ "time" => time.to_i, "source" => if @source.nil? then tag.to_s else @placeholder_expander.expand(@source, placeholders) end, "sourcetype" => @placeholder_expander.expand(@sourcetype.to_s, placeholders), "host" => @placeholder_expander.expand(@host.to_s, placeholders), "index" => @placeholder_expander.expand(@index, placeholders) ] # TODO: parse different source types as expected: KVP, JSON, TEXT if @all_items splunk_object["event"] = convert_to_utf8(record) else splunk_object["event"] = convert_to_utf8(record["message"]) end json_event = splunk_object.to_json #log.debug "Generated JSON(#{json_event.class.to_s}): #{json_event.to_s}" #log.debug "format: returning: #{[tag, record].to_json.to_s}" json_event end
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 283 def numfmt(input) input.to_s.reverse.gsub(/(\d{3})(?=\d)/, '\1,').reverse end
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 241 def push_buffer(body) post = Net::HTTP::Post.new @splunk_uri.request_uri post.body = body log.debug "POST #{@splunk_uri}" if @test_mode log.debug "TEST_MODE Payload: #{body}" return end # retry up to :post_retry_max times 1.upto(@post_retry_max) do |c| response = @http.request @splunk_uri, post log.debug "=>(#{c}/#{numfmt(@post_retry_max)}) #{response.code} " + "(#{response.message})" # TODO check the actual server response too (it's JSON) if response.code == "200" # and... # success break # TODO check 40X response within post_retry_max and retry elsif response.code.match(/^50/) and c < @post_retry_max # retry log.warn "#{@splunk_uri}: Server error #{response.code} (" + "#{response.message}). Retrying in #{@post_retry_interval} " + "seconds.\n#{response.body}" sleep @post_retry_interval next elsif response.code.match(/^40/) # user error log.error "#{@splunk_uri}: #{response.code} (#{response.message})\n#{response.body}" break elsif c < @post_retry_max # retry log.debug "#{@splunk_uri}: Retrying..." sleep @post_retry_interval next else # other errors. fluentd will retry processing on exception # FIXME: this may duplicate logs when using multiple buffers raise "#{@splunk_uri}: #{response.message}\n#{response.body}" end # If response.code end # 1.upto(@post_retry_max) end
This method is called when shutting down. Shutdown the thread and close sockets or files here.
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 137 def shutdown super log.trace "splunk-http-eventcollector(shutdown) called" @http.shutdown log.trace "shutdown from splunk-http-eventcollector" end
This method is called when starting. Open sockets or files here.
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 123 def start super log.trace "splunk-http-eventcollector(start) called" @http = Net::HTTP::Persistent.new 'fluent-plugin-splunk-http-eventcollector' @http.verify_mode = OpenSSL::SSL::VERIFY_NONE unless @verify @http.override_headers['Content-Type'] = 'application/json' @http.override_headers['User-Agent'] = 'fluent-plugin-splunk-http-eventcollector/0.0.1' @http.override_headers['Authorization'] = "Splunk #{@token}" log.trace "initialized for splunk-http-eventcollector" end
By this point, fluentd has decided its buffer is full and it's time to flush it. chunk.read is a concatenated string of JSON.to_s objects. Simply POST them to Splunk and go about our life.
This method is called every flush interval. Write the buffer chunk to files or databases here. 'chunk' is a buffer chunk that includes multiple formatted events. You can use 'data = chunk.read' to get all events and 'chunk.open {|io| ... }' to get IO objects. NOTE! This method is called by internal thread, not Fluentd's main thread. So IO wait doesn't affect other plugins.
# File lib/fluent/plugin/out_splunk-http-eventcollector.rb, line 190 def write(chunk) log.trace "splunk-http-eventcollector(write) called" # Break the concatenated string of JSON-formatted events into an Array split_chunk = chunk.read.split("}{").each do |x| # Reconstruct the opening{/closing} that #split() strips off. x.prepend("{") unless x.start_with?("{") x << "}" unless x.end_with?("}") end log.debug "Pushing #{numfmt(split_chunk.size)} events (" + "#{numfmt(chunk.read.bytesize)} bytes) to Splunk." # If fluentd is pushing too much data to Splunk at once, split up the payload # Don't care about the number of events so much as the POST size (bytes) #if split_chunk.size > @batch_event_limit # log.warn "Fluentd is attempting to push #{numfmt(split_chunk.size)} " + # "events in a single push to Splunk. The configured limit is " + # "#{numfmt(@batch_event_limit)}." #end if chunk.read.bytesize > @batch_size_limit log.warn "Fluentd is attempting to push #{numfmt(chunk.read.bytesize)} " + "bytes in a single push to Splunk. The configured limit is " + "#{numfmt(@batch_size_limit)} bytes." newbuffer = Array.new split_chunk_counter = 0 split_chunk.each do |c| split_chunk_counter = split_chunk_counter + 1 #log.debug "(#{numfmt(split_chunk_counter)}/#{numfmt(split_chunk.size)}) " + # "newbuffer.bytesize=#{numfmt(newbuffer.join.bytesize)} + " + # "c.bytesize=#{numfmt(c.bytesize)} ????" if newbuffer.join.bytesize + c.bytesize < @batch_size_limit #log.debug "Appended!" newbuffer << c else # Reached the limit - push the current newbuffer.join, and reset #log.debug "Would exceed limit. Flushing newbuffer and continuing." log.debug "(#{numfmt(split_chunk_counter)}/#{numfmt(split_chunk.size)}) " + "newbuffer.bytesize=#{numfmt(newbuffer.join.bytesize)} + " + "c.bytesize=#{numfmt(c.bytesize)} > #{numfmt(@batch_size_limit)}, " + "flushing current buffer to Splunk." push_buffer newbuffer.join newbuffer = Array c end # if/else buffer fits limit end # split_chunk.each # Push anything left over. push_buffer newbuffer.join if newbuffer.size return else return push_buffer chunk.read end # if chunk.read.bytesize > @batch_size_limit end