class Fluent::SplunkHECOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk_hec.rb, line 54 def configure(conf) super if @channel && @auto_generate_channel log.warn "Both channel and auto_generate_channel are set.. ignoring channel param and auto generating channel instead" end @channel = SecureRandom.uuid if @auto_generate_channel raise ConfigError, "'channel' parameter is required when 'use_ack' is true" if @use_ack && !@channel raise ConfigError, "'ack_interval' parameter must be a non negative integer" if @use_ack && @ack_interval < 0 raise ConfigError, "'event_key' parameter is required when 'raw' is true" if @raw && !@event_key raise ConfigError, "'channel' parameter is required when 'raw' is true" if @raw && !@channel @default_sourcetype = @sourcetype if @sourcetype && !@default_sourcetype # build hash for query string if @raw @query = {} @query['host'] = @default_host if @default_host @query['source'] = @default_source if @default_source @query['index'] = @default_index if @default_index @query['sourcetype'] = @default_sourcetype if @default_sourcetype end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 80 def multi_workers_ready? true end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk_hec.rb, line 89 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk_hec.rb, line 84 def start setup_client super end
write_objects(_tag, chunk)
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 93 def write_objects(_tag, chunk) return if chunk.empty? payload = '' chunk.msgpack_each do |time, record| payload << (@raw ? format_event_raw(record) : format_event(time, record)) end post_payload(payload) unless payload.empty? end
Private Instance Methods
check_ack(ack_id, retries)
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 178 def check_ack(ack_id, retries) raise "failed to index the data ack_id=#{ack_id}" if retries < 0 ack_res = post('/services/collector/ack', {'acks' => [ack_id]}.to_json) ack_res_json = JSON.parse(ack_res.body) if ack_res_json['acks'] && ack_res_json['acks'][ack_id.to_s] return else sleep(@ack_interval) check_ack(ack_id, retries - 1) end end
format_event(time, record)
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 119 def format_event(time, record) msg = {'event' => record} if @use_fluentd_time msg['time'] = time.respond_to?('to_f') ? time.to_f : time end # metadata if record[@sourcetype_key] msg['sourcetype'] = @remove_sourcetype_key ? record.delete(@sourcetype_key) : record[@sourcetype_key] elsif @default_sourcetype msg['sourcetype'] = @default_sourcetype end if record[@host_key] msg['host'] = @remove_host_key ? record.delete(@host_key) : record[@host_key] elsif @default_host msg['host'] = @default_host end if record[@source_key] msg['source'] = @remove_source_key ? record.delete(@source_key) : record[@source_key] elsif @default_source msg['source'] = @default_source end if record[@index_key] msg['index'] = @remove_index_key ? record.delete(@index_key) : record[@index_key] elsif @default_index msg['index'] = @default_index end res = Yajl.dump(msg) res << @line_breaker res end
format_event_raw(record)
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 155 def format_event_raw(record) (record[@event_key] || '') + @line_breaker end
post(path, body, query = {})
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 159 def post(path, body, query = {}) @client.post(path, body: body, query: query) end
post_payload(payload)
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 163 def post_payload(payload) res = nil if @raw res = post('/services/collector/raw', payload, @query) else res = post('/services/collector', payload) end log.debug "Splunk response: #{res.body}" if @use_ack res_json = JSON.parse(res.body) ack_id = res_json['ackId'] check_ack(ack_id, @ack_retry_limit) end end
setup_client()
click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 104 def setup_client header = {'Content-type' => 'application/json', 'Authorization' => "Splunk #{@token}"} header['X-Splunk-Request-Channel'] = @channel if @channel base_url = @use_ssl ? URI::HTTPS.build(host: @host, port: @port) : URI::HTTP.build(host: @host, port: @port) @client = HTTPClient.new(default_header: header, base_url: base_url) if @use_ssl verify_mode = (@ssl_verify ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE) @client.ssl_config.verify_mode = verify_mode @client.ssl_config.add_trust_ca(@ca_file) if @ca_file @client.ssl_config.set_client_cert_file(@client_cert, @client_key, @client_key_pass) if @client_cert && @client_key end end