class Fluent::SplunkHECOutput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk_hec.rb, line 54
def configure(conf)
  super

  if @channel && @auto_generate_channel 
    log.warn "Both channel and auto_generate_channel are set.. ignoring channel param and auto generating channel instead"
  end

  @channel = SecureRandom.uuid if @auto_generate_channel

  raise ConfigError, "'channel' parameter is required when 'use_ack' is true" if @use_ack && !@channel
  raise ConfigError, "'ack_interval' parameter must be a non negative integer" if @use_ack && @ack_interval < 0
  raise ConfigError, "'event_key' parameter is required when 'raw' is true" if @raw && !@event_key
  raise ConfigError, "'channel' parameter is required when 'raw' is true" if @raw && !@channel
  
  @default_sourcetype = @sourcetype if @sourcetype && !@default_sourcetype

  # build hash for query string
  if @raw
    @query = {}
    @query['host'] = @default_host if @default_host
    @query['source'] = @default_source if @default_source
    @query['index'] = @default_index if @default_index
    @query['sourcetype'] = @default_sourcetype if @default_sourcetype
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 80
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk_hec.rb, line 89
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_splunk_hec.rb, line 84
def start
  setup_client
  super
end
write_objects(_tag, chunk) click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 93
def write_objects(_tag, chunk)
  return if chunk.empty?

  payload = ''
  chunk.msgpack_each do |time, record|
    payload << (@raw ? format_event_raw(record) : format_event(time, record))
  end
  post_payload(payload) unless payload.empty?
end

Private Instance Methods

check_ack(ack_id, retries) click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 178
def check_ack(ack_id, retries)
  raise "failed to index the data ack_id=#{ack_id}" if retries < 0

  ack_res = post('/services/collector/ack', {'acks' => [ack_id]}.to_json)
  ack_res_json = JSON.parse(ack_res.body)
  if ack_res_json['acks'] && ack_res_json['acks'][ack_id.to_s]
    return
  else
    sleep(@ack_interval)
    check_ack(ack_id, retries - 1)
  end
end
format_event(time, record) click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 119
def format_event(time, record)
  msg = {'event' => record}
  if @use_fluentd_time
    msg['time'] = time.respond_to?('to_f') ? time.to_f : time
  end

  # metadata
  if record[@sourcetype_key]
    msg['sourcetype'] = @remove_sourcetype_key ? record.delete(@sourcetype_key) : record[@sourcetype_key]
  elsif @default_sourcetype
    msg['sourcetype'] = @default_sourcetype
  end

  if record[@host_key]
    msg['host'] = @remove_host_key ? record.delete(@host_key) : record[@host_key]
  elsif @default_host
    msg['host'] = @default_host
  end

  if record[@source_key]
    msg['source'] =  @remove_source_key ? record.delete(@source_key) : record[@source_key]
  elsif @default_source
    msg['source'] = @default_source
  end

  if record[@index_key]
    msg['index'] = @remove_index_key ? record.delete(@index_key) : record[@index_key]
  elsif @default_index
    msg['index'] = @default_index
  end

  res = Yajl.dump(msg)
  res << @line_breaker
  res
end
format_event_raw(record) click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 155
def format_event_raw(record)
  (record[@event_key] || '') + @line_breaker
end
post(path, body, query = {}) click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 159
def post(path, body, query = {})
  @client.post(path, body: body, query: query)
end
post_payload(payload) click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 163
def post_payload(payload)
  res = nil
  if @raw
    res = post('/services/collector/raw', payload, @query)
  else
    res = post('/services/collector', payload)
  end
  log.debug "Splunk response: #{res.body}"
  if @use_ack
    res_json = JSON.parse(res.body)
    ack_id = res_json['ackId']
    check_ack(ack_id, @ack_retry_limit)
  end
end
setup_client() click to toggle source
# File lib/fluent/plugin/out_splunk_hec.rb, line 104
def setup_client
  header = {'Content-type' => 'application/json',
            'Authorization' => "Splunk #{@token}"}
  header['X-Splunk-Request-Channel'] = @channel if @channel
  base_url = @use_ssl ? URI::HTTPS.build(host: @host, port: @port) : URI::HTTP.build(host: @host, port: @port)
  @client = HTTPClient.new(default_header: header,
                           base_url: base_url)
  if @use_ssl
    verify_mode = (@ssl_verify ? OpenSSL::SSL::VERIFY_PEER : OpenSSL::SSL::VERIFY_NONE)
    @client.ssl_config.verify_mode = verify_mode
    @client.ssl_config.add_trust_ca(@ca_file) if @ca_file
    @client.ssl_config.set_client_cert_file(@client_cert, @client_key, @client_key_pass) if @client_cert && @client_key
  end
end