class Embulk::Input::Splunk

Constants

SPLUNK_DEFAULT_TIME_FIELD
SPLUNK_TIME_FIELD
SPLUNK_TIME_FORMAT

Public Class Methods

resume(task, columns, count) { |task, columns, count| ... } click to toggle source
# File lib/embulk/input/splunk.rb, line 36
def self.resume(task, columns, count, &control)
  task_reports = yield(task, columns, count)
  
  next_config_diff = {}

  # This will work with multiple threads
  latest_time_in_results = task_reports.collect do |report| 
    report[:latest_time_in_results].to_i
  end.max
  
  
  if task["incremental"] && latest_time_in_results.present?
    next_config_diff[:earliest_time] = DateTime.strptime(latest_time_in_results.to_s, "%Q").strftime(SPLUNK_TIME_FORMAT)
  end

  return next_config_diff
end
task_from_config(config) click to toggle source
# File lib/embulk/input/splunk.rb, line 54
def self.task_from_config(config)
  task = {
    "scheme" => config.param("scheme", :string, default: "https"),
    "host" => config.param("host", :string),
    "port" => config.param("port", :integer, default: 8089),
    "username" => config.param("username", :string),
    "password" => config.param("password", :string),

    "max_results" => config.param("max_results", :integer, default: 50_000),

    "query" => config.param("query", :string),

    "earliest_time" => config.param(:earliest_time, :string, default: nil),
    "latest_time" => config.param(:latest_time, :string, default: nil),

    "incremental" => config.param("incremental", :bool, default: false),
    "table" => config.param("table", :array, default: [])
  }
end
transaction(config, &control) click to toggle source
# File lib/embulk/input/splunk.rb, line 16
def self.transaction(config, &control)

  task = task_from_config(config)
  
  if task["incremental"] && task["latest_time"]
    Embulk.logger.warn "Incremental is 'true' and latest_time is set. This may have unexpected results."
  end
  
  if task["table"].select { |field| field["name"] == "_time" }.empty?
      Embulk.logger.warn "_time is not included in table. Automatically adding it."
      task["table"] << SPLUNK_TIME_FIELD
  end

  columns = task["table"].map do |column|
    Column.new(nil, column["name"], column["type"]&.to_sym || :string, column["format"])
  end

  resume(task, columns, 1, &control)
end

Public Instance Methods

init() click to toggle source
# File lib/embulk/input/splunk.rb, line 99
def init
  @max_results = task[:max_results]        
  @earliest_time, @latest_time = task[:earliest_time], task[:latest_time]
  @fields = task["table"].collect { |entry| entry["name"] }
  @query = build_query( task[:query] )
end
run() click to toggle source
# File lib/embulk/input/splunk.rb, line 106
def run
  Embulk.logger.debug "Establishing connection to Splunk"
  service = ::Splunk::connect(splunk_config)
  
  latest_time = nil
  loop_count = 0
  
  # There is a limit to how many results Splunk API will return.
  # To avoid silently dropping results, we need to iterate until there are not more results.
  loop do
    number_of_results = 0
    
    query_options = {
      count:  @max_results,
      offset: loop_count * @max_results,
      earliest_time: @earliest_time,
      latest_time: @latest_time,
    }
  
    Embulk.logger.debug "Running query `#{@query}` with options #{query_options} in loop #{loop_count}"
    stream = service.create_oneshot(@query, query_options)

    reader = ::Splunk::ResultsReader.new(stream)

    reader.each do |result|
      number_of_results += 1
      
      # We convert event_time to integer easy comparison only.
      event_time = DateTime.strptime( result[SPLUNK_DEFAULT_TIME_FIELD], SPLUNK_TIME_FORMAT ).strftime("%Q").to_i

      # We need to keep track of latest time for incremental loads.
      # Unfortunately, Splunk was not respecting our sort requests, so we need to do a comparison for each row.
      latest_time = latest_time.nil? ? event_time : [latest_time, event_time].max

      row = @fields.map { |field| result[ field ] }
      page_builder.add( row )
    end
    
    break if (number_of_results < @max_results) || (@max_results == 0)
    
    loop_count += 1
  end
  
  page_builder.finish
  
  return { latest_time_in_results: latest_time }
end

Protected Instance Methods

build_query(query) click to toggle source
# File lib/embulk/input/splunk.rb, line 76
def build_query(query)
  # sort 0 _ is required to remove the magic 10,000 result for the sort command.
  # See: https://answers.splunk.com/answers/423870/why-are-search-results-cut-off-at-10000-in-splunk.html
  
  %Q{
    #{query}
    | sort 0 #{SPLUNK_DEFAULT_TIME_FIELD}
    | table #{ @fields.join(", ") }
  }
end
splunk_config() click to toggle source
# File lib/embulk/input/splunk.rb, line 87
def splunk_config
  {
    :scheme => task[:scheme],
    :host => task[:host],
    :port => task[:port],
    :username => task[:username],
    :password => task[:password]
  }
end