class Embulk::Input::Splunk
Constants
- SPLUNK_DEFAULT_TIME_FIELD
- SPLUNK_TIME_FIELD
- SPLUNK_TIME_FORMAT
Public Class Methods
resume(task, columns, count) { |task, columns, count| ... }
click to toggle source
# File lib/embulk/input/splunk.rb, line 36 def self.resume(task, columns, count, &control) task_reports = yield(task, columns, count) next_config_diff = {} # This will work with multiple threads latest_time_in_results = task_reports.collect do |report| report[:latest_time_in_results].to_i end.max if task["incremental"] && latest_time_in_results.present? next_config_diff[:earliest_time] = DateTime.strptime(latest_time_in_results.to_s, "%Q").strftime(SPLUNK_TIME_FORMAT) end return next_config_diff end
task_from_config(config)
click to toggle source
# File lib/embulk/input/splunk.rb, line 54 def self.task_from_config(config) task = { "scheme" => config.param("scheme", :string, default: "https"), "host" => config.param("host", :string), "port" => config.param("port", :integer, default: 8089), "username" => config.param("username", :string), "password" => config.param("password", :string), "max_results" => config.param("max_results", :integer, default: 50_000), "query" => config.param("query", :string), "earliest_time" => config.param(:earliest_time, :string, default: nil), "latest_time" => config.param(:latest_time, :string, default: nil), "incremental" => config.param("incremental", :bool, default: false), "table" => config.param("table", :array, default: []) } end
transaction(config, &control)
click to toggle source
# File lib/embulk/input/splunk.rb, line 16 def self.transaction(config, &control) task = task_from_config(config) if task["incremental"] && task["latest_time"] Embulk.logger.warn "Incremental is 'true' and latest_time is set. This may have unexpected results." end if task["table"].select { |field| field["name"] == "_time" }.empty? Embulk.logger.warn "_time is not included in table. Automatically adding it." task["table"] << SPLUNK_TIME_FIELD end columns = task["table"].map do |column| Column.new(nil, column["name"], column["type"]&.to_sym || :string, column["format"]) end resume(task, columns, 1, &control) end
Public Instance Methods
init()
click to toggle source
# File lib/embulk/input/splunk.rb, line 99 def init @max_results = task[:max_results] @earliest_time, @latest_time = task[:earliest_time], task[:latest_time] @fields = task["table"].collect { |entry| entry["name"] } @query = build_query( task[:query] ) end
run()
click to toggle source
# File lib/embulk/input/splunk.rb, line 106 def run Embulk.logger.debug "Establishing connection to Splunk" service = ::Splunk::connect(splunk_config) latest_time = nil loop_count = 0 # There is a limit to how many results Splunk API will return. # To avoid silently dropping results, we need to iterate until there are not more results. loop do number_of_results = 0 query_options = { count: @max_results, offset: loop_count * @max_results, earliest_time: @earliest_time, latest_time: @latest_time, } Embulk.logger.debug "Running query `#{@query}` with options #{query_options} in loop #{loop_count}" stream = service.create_oneshot(@query, query_options) reader = ::Splunk::ResultsReader.new(stream) reader.each do |result| number_of_results += 1 # We convert event_time to integer easy comparison only. event_time = DateTime.strptime( result[SPLUNK_DEFAULT_TIME_FIELD], SPLUNK_TIME_FORMAT ).strftime("%Q").to_i # We need to keep track of latest time for incremental loads. # Unfortunately, Splunk was not respecting our sort requests, so we need to do a comparison for each row. latest_time = latest_time.nil? ? event_time : [latest_time, event_time].max row = @fields.map { |field| result[ field ] } page_builder.add( row ) end break if (number_of_results < @max_results) || (@max_results == 0) loop_count += 1 end page_builder.finish return { latest_time_in_results: latest_time } end
Protected Instance Methods
build_query(query)
click to toggle source
# File lib/embulk/input/splunk.rb, line 76 def build_query(query) # sort 0 _ is required to remove the magic 10,000 result for the sort command. # See: https://answers.splunk.com/answers/423870/why-are-search-results-cut-off-at-10000-in-splunk.html %Q{ #{query} | sort 0 #{SPLUNK_DEFAULT_TIME_FIELD} | table #{ @fields.join(", ") } } end
splunk_config()
click to toggle source
# File lib/embulk/input/splunk.rb, line 87 def splunk_config { :scheme => task[:scheme], :host => task[:host], :port => task[:port], :username => task[:username], :password => task[:password] } end