class LogStash::Inputs::OktaSystemLog

Constants

AUTH_TEST_URL
HTTP_BAD_REQUEST_400
HTTP_OK_200
HTTP_TOO_MANY_REQUESTS_429
HTTP_UNAUTHORIZED_401
MAX_MMAP_FILE_SIZE
OKTA_EVENT_LOG_PATH
RATE_OPTIONS

Based on data from here: developer.okta.com/docs/reference/api/system-log/#system-events – For One App and Enterprise orgs, the warning is sent when the org is at 60% of its limit.

SLEEP_API_RATE_LIMIT

Sleep Timers

SLEEP_STATE_FILE_RETRY
Schedule_types

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 230
def register

  @trace_log_method = detect_trace_log_method()

  if (@limit < 1 or @limit > 1000 or !@limit.integer?)
    @logger.fatal("Invalid `limit` value: #{@limit}. " +
      "Config limit should be an integer between 1 and 1000.")
    raise LogStash::ConfigurationError, "Invalid `limit` value: #{@limit}. " + 
      "Config limit should be an integer between 1 and 1000."
  end

  unless (@hostname.nil? ^ @custom_url.nil?)
    @logger.fatal("Please configure the hostname " +
      "or the custom_url to use.")
    raise LogStash::ConfigurationError, "Please configure the hostname " +
      "or the custom_url to use."
  end

  if (@hostname)
    begin
      url_obj = URI::HTTPS.build(
                  :host => @hostname,
                  :path => OKTA_EVENT_LOG_PATH)
    rescue URI::InvalidComponentError
      @logger.fatal("Invalid hostname, " + 
        "could not configure URL. hostname = #{@hostname}.")
      raise LogStash::ConfigurationError, "Invalid hostname, " + 
        "could not configure URL. hostname = #{@hostname}."
    end
  end
  if (@custom_url)
    begin
      # The URL comes in as a SafeURI object which doesn't get parsed nicely.
      # Cast to string helps with that
      # Really only happens during tests and not during normal operations
      url_obj = URI.parse(@custom_url.to_s)
      unless (url_obj.kind_of? URI::HTTP or url_obj.kind_of? URI::HTTPS)
        raise LogStash::ConfigurationError, "Invalid custom_url, " +
          "please verify the URL. custom_url = #{@custom_url}"
        @logger.fatal("Invalid custom_url, " +
          "please verify the URL. custom_url = #{@custom_url}")
      end
    rescue URI::InvalidURIError
      @logger.fatal("Invalid custom_url, " +
        "please verify the URL. custom_url = #{@custom_url}")
      raise LogStash::ConfigurationError, "Invalid custom_url, " + 
        "please verify the URL. custom_url = #{@custom_url}"
    end

  end
  
  if (@since)
    begin
      @since = DateTime.parse(@since).rfc3339(0)
    rescue ArgumentError => e
      @logger.fatal("since must be of the form " +
        "yyyy-MM-dd’‘T’‘HH:mm:ssZZ, e.g. 2013-01-01T12:00:00-07:00.")
      raise LogStash::ConfigurationError, "since must be of the form " +
        "yyyy-MM-dd’‘T’‘HH:mm:ssZZ, e.g. 2013-01-01T12:00:00-07:00."
    end
  end

  if (@q)
    if (@q.length > 10)
      msg = "q cannot have more than 10 terms. " + 
        "Use the `filter` to limit the query."
      @logger.fatal(msg)
      raise LogStash::ConfigurationError, msg
    end
    space_errors = []
    length_errors = []
    for item in @q
      if (item.include? " ")
        space_errors.push(item)
      elsif (item.length > 40)
        length_errors.push(item)
      end
    end
    if (space_errors.length > 0)
      @logger.fatal("q items cannot contain a space. " +
        "Items: #{space_errors.join(" ")}.")
      raise LogStash::ConfigurationError, "q items cannot contain a space. " +
        "Items: #{space_errors.join(" ")}."
    end
    if (length_errors.length > 0)
      msg = "q items cannot contain be longer than 40 characters. " + 
        "Items: #{length_errors.join(" ")}."
      @logger.fatal(msg)
      raise LogStash::ConfigurationError, msg
    end
  end

  if (@custom_auth_header)
    if (@auth_token_key or @auth_token_file)
      @logger.fatal("If custom_auth_header is used " +
        "you cannot set auth_token_key or auth_token_file")
      raise LogStash::ConfigurationError, "If custom_auth_header is used " + 
        "you cannot set auth_token_key or auth_token_file"
    end
  else
    unless (@auth_token_key.nil? ^ @auth_token_file.nil?)
      auth_message = "Set only the  auth_token_key or auth_token_file."
      @logger.fatal(auth_message)
      raise LogStash::ConfigurationError, auth_message
    end

    if (@auth_token_file)
      begin
        auth_file_size = File.size(@auth_token_file)
        if (auth_file_size > MAX_MMAP_FILE_SIZE)
          @logger.fatal("The auth_token file " +
            "is too large to map")
          raise LogStash::ConfigurationError, "The auth_token file " + 
            "is too large to map"
        else
          @auth_token = LogStash::Util::Password.new(
                          File.read(@auth_token_file, auth_file_size).chomp)
          @logger.info("Successfully opened auth_token_file",
            :auth_token_file => @auth_token_file)
        end
      rescue LogStash::ConfigurationError
        raise
      rescue => e
        # This is a bug in older versions of  logstash, confirmed here:
        # https://discuss.elastic.co/t/logstash-configurationerror-but-configurationok-logstash-2-4-0/65727/2
        @logger.fatal(e.inspect)
        raise LogStash::ConfigurationError, e.inspect
      end
    else 
      @auth_token = @auth_token_key
    end

    if (@auth_token)
      begin
        response = client.get(
          url_obj.to_s+AUTH_TEST_URL,
            headers: {'Authorization' => "SSWS #{@auth_token.value}"},
            request_timeout: 2,
            connect_timeout: 2,
            socket_timeout: 2)
        if (response.code == HTTP_UNAUTHORIZED_401)
          @logger.fatal("The auth_code provided " +
            "was not valid, please check the input")
          raise LogStash::ConfigurationError, "The auth_code provided " + 
            "was not valid, please check the input"
        end
      rescue LogStash::ConfigurationError
        raise
      rescue Manticore::ManticoreException => m
        msg = "There was a connection error verifying the auth_token, " + 
          "continuing without verification"
        @logger.error(msg, :client_error => m.inspect)
      rescue => e
        @logger.fatal("Could not verify auth_token, " +
          "error: #{e.inspect}")
        raise LogStash::ConfigurationError, "Could not verify auth_token, " + 
          "error: #{e.inspect}"
      end
    end
  end

  if (RATE_OPTIONS[@rate_limit] != false)
    @rate_limit = RATE_OPTIONS[@rate_limit]
  else
    @rate_limit = @rate_limit.to_f.floor 1
  end

  if (@rate_limit < 0.1 or @rate_limit > 1.0)
    raise LogStash::ConfigurationError, "rate_limit should be between " +
      "'0.1' and '1.0' or 'RATE_SLOW', 'RATE_MEDIUM' or 'RATE_FAST'"
  end

  @rate_limit_factor = 1.0 - @rate_limit

  params_event = Hash.new
  params_event[:limit] = @limit if @limit > 0
  params_event[:since] = @since if @since
  params_event[:filter] = @filter if @filter
  params_event[:q] = @q.join(" ") if @q
  url_obj.query = URI.encode_www_form(params_event)


  # This check is Logstash 5 specific.  If the class does not exist, and it
  # won't in older versions of Logstash, then we need to set it to nil.
  settings = defined?(LogStash::SETTINGS) ? LogStash::SETTINGS : nil

  if (@state_file_path.nil?)
    begin
      base_state_file_path = build_state_file_base(settings)
    rescue LogStash::ConfigurationError
      raise
    rescue => e
      @logger.fatal("Could not set up state file", :exception => e.inspect)
      raise LogStash::ConfigurationError, e.inspect
    end
    file_prefix = "#{@hostname}_system_log_state"
    case Dir[File.join(base_state_file_path,"#{file_prefix}*")].size
    when 0
      # Build a file name randomly
      @state_file_path = File.join(
                                    base_state_file_path, 
                                    rand_filename("#{file_prefix}"))
      @logger.info('No state_file_path set, generating one based on the ' +
        '"hostname" setting', 
        :state_file_path => @state_file_path.to_s, 
        :hostname => @hostname)
    when 1
      @state_file_path = Dir[File.join(base_state_file_path,"#{file_prefix}*")].last
      @logger.info('Found state file based on the "hostname" setting', 
        :state_file_path => @state_file_path.to_s, 
        :hostname => @hostname)
    else
      msg = "There is more than one file" +
        "in the state file base dir (possibly an error?)." +
        "Please keep the latest/most relevant file.\n" +
        "Directory: #{base_state_file_path}"
      @logger.fatal(msg)
      raise LogStash::ConfigurationError, msg
    end
      
  else
    @state_file_path = File.path(@state_file_path)
    if (File.directory?(@state_file_path))
      @logger.fatal("The `state_file_path` argument must point to a file, " +
        "received a directory: #{@state_file_path}")
      raise LogStash::ConfigurationError, "The `state_file_path` argument " +
        "must point to a file, received a directory: #{@state_file_path}"
    end
  end
  begin
    @state_file_stat = detect_state_file_mode(@state_file_path)
  rescue => e
    @logger.fatal("Error getting state file info. " + 
      "Exception: #{e.inspect}")
    raise LogStash::ConfigurationError, "Error getting state file info. " +
      "Exception: #{e.inspect}"
  end

  @write_method = detect_write_method(@state_file_path)

  begin
    state_file_size = File.size(@state_file_path)
    if (state_file_size > 0)
      if (state_file_size > MAX_MMAP_FILE_SIZE)
        @logger.fatal("The state file: " +
          "#{@state_file_path} is too large to map")
        raise LogStash::ConfigurationError, "The state file: " +
          "#{@state_file_path} is too large to map"
      end
      state_url = File.read(@state_file_path, state_file_size).chomp
      if (state_url.length > 0)
        state_url_obj = URI.parse(state_url)
        @logger.info(
          "Successfully opened state_file_path",
          :state_url => state_url_obj.to_s,
          :state_file_path => @state_file_path)
        if (@custom_url)
         unless (url_obj.hostname == state_url_obj.hostname)
          @logger.fatal("The state URL " +
            "does not match configured URL. ",
            :configured_url => url_obj.to_s, 
            :state_url => state_url_obj.to_s)
          raise LogStash::ConfigurationError, "The state URL " +
            "does not match configured URL. " +
            "Configured url: #{url_obj.to_s}, state_url: #{state_url_obj.to_s}"
          end
        else
          unless (state_url_obj.hostname == @hostname and
            state_url_obj.path == OKTA_EVENT_LOG_PATH)
            @logger.fatal("The state URL " +
              "does not match configured URL. " +
              :configured_url => url_obj.to_s, 
              :state_url => state_url_obj.to_s)
            raise LogStash::ConfigurationError, "The state URL " +
              "does not match configured URL. " +
              "Configured url: #{url_obj.to_s}, state_url: #{state_url_obj.to_s}"
          end
        end
        url_obj = state_url_obj
      end
    end
  rescue LogStash::ConfigurationError
    raise
  rescue URI::InvalidURIError => e
    @logger.fatal("Could not parse url " +
      "from state_file_path. URL: #{state_url}. Error: #{e.inspect}.")
    raise LogStash::ConfigurationError, "Could not parse url " +
      "from state_file_path. URL: #{state_url}. Error: #{e.inspect}."
  rescue => e
    @logger.fatal(e.inspect)
    raise LogStash::ConfigurationError, e.inspect
  end

  @url = url_obj.to_s

  @logger.info("Created initial URL to call", :url => @url)
  @host = Socket.gethostname.force_encoding(Encoding::UTF_8)

  if (@metadata_target)
    @metadata_function = method(:apply_metadata)
  else
    @metadata_function = method(:noop)
  end

  if (@state_file_fatal_failure)
    @state_file_failure_function = method(:fatal_state_file)
  else
    @state_file_failure_function = method(:error_state_file)
  end

end
run(queue) click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 543
def run(queue)
  
  msg_invalid_schedule = "Invalid config. schedule hash must contain " +
    "exactly one of the following keys - cron, at, every or in"

  @logger.fatal(msg_invalid_schedule) if @schedule.keys.length !=1
  raise LogStash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length !=1
  schedule_type = @schedule.keys.first
  schedule_value = @schedule[schedule_type]
  @logger.fatal(msg_invalid_schedule) unless Schedule_types.include?(schedule_type)
  raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type)
  @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
  
  #as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
  opts = schedule_type == "every" ? { :first_in => 0.01 } : {} 
  opts[:overlap] = false;

  @logger.info("Starting event stream with the configured URL.", 
    :url => @url)
  @scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) }

  @scheduler.join

end
stop() click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 1116
def stop
  # nothing to do in this case so it is not necessary to define stop
  # examples of common "stop" tasks:
  #  * close sockets (unblocking blocking reads/accepts)
  #  * cleanup temporary files
  #  * terminate spawned threads
  begin 
    @scheduler.stop
  rescue NoMethodError => e
    unless (e.message == "undefined method `stop' for nil:NilClass")
      raise
    end
  rescue => e
    @logger.warn("Undefined error", :exception => e.inspect)
    raise
  ensure
    if (is_defined(@url))
      update_state_file()
    end
  end
end

Private Instance Methods

apply_metadata(event, requested_url, response=nil, exec_time=nil) click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 898
def apply_metadata(event, requested_url, response=nil, exec_time=nil)

  m = {
    "host" => @host,
    "url" => requested_url
    }

  if exec_time
    m["runtime_seconds"] = exec_time.round(3)
  end

  if response
    m["code"] = response.code
    m["response_headers"] = response.headers
    m["response_message"] = response.message
    m["retry_count"] = response.times_retried
  end

  event.set(@metadata_target,m)

end
atomic_write(path, content) click to toggle source

based on code from logstash-input-file

# File lib/logstash/inputs/okta_system_log.rb, line 947
def atomic_write(path, content)
  write_atomically(path) do |io|
    io.write("#{content}\n")
  end
end
detect_state_file_mode(path) click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 1097
def detect_state_file_mode(path)
  if (File.exist?(path))
    old_stat = File.stat(path)
  else
    # We need to create a file anyway so check it with the file created
    # # If not possible, probe which are the default permissions in the
    # # destination directory.
    # old_stat = probe_stat_in(File.dirname(@state_file_path))
    
    # 'touch' a file
    File.open(path, "w") {}
    old_stat = File.stat(path)
  end

  return old_stat ? old_stat : nil

end
detect_write_method(path) click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 1084
def detect_write_method(path)
  if (LogStash::Environment.windows? || 
    File.chardev?(path) || 
    File.blockdev?(path) || 
    File.socket?(path))
    @logger.info("State file cannot be updated using an atomic write, " +
      "using non-atomic write", :state_file_path => path)
    return method(:non_atomic_write)
  else
    return method(:atomic_write)
  end
end
error_state_file() click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 939
def error_state_file()
  @logger.error("Unable to save state_file_path after retrying three times",
    :url => @url,
    :state_file_path => @state_file_path)
end
fatal_state_file() click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 927
def fatal_state_file()
  @logger.fatal("Unable to save state file after retrying. Exiting...",
    :url => @url,
    :state_file_path => @state_file_path)

  @logger.fatal("Unable to save state_file_path, " +
    "#{@state_file_path} after retrying.")
  raise LogStash::EnvironmentError, "Unable to save state_file_path, " + 
    "#{@state_file_path} after retrying."
end
get_epoch() click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 826
def get_epoch()
  return Time.now.to_i
end
handle_failure(queue, exception, requested_url, exec_time) click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 878
def handle_failure(queue, exception, requested_url, exec_time)

  @continue = false
  @logger.error("Client Connection Error", 
    :exception => exception.inspect)

  event = LogStash::Event.new
  @metadata_function.call(event, requested_url, nil, exec_time)
  event.set("http_request_error", {
    "okta_plugin_status" => "Client Connection Error",
    "connect_error" => exception.message,
    "backtrace" => exception.backtrace
    })
  event.tag("_http_request_error")
  decorate(event)
  queue << event

end
handle_success(queue, response, requested_url, exec_time) click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 643
def handle_success(queue, response, requested_url, exec_time)

  @continue = false

  case response.code
  when HTTP_OK_200
    ## Some benchmarking code for reasonings behind the methods.
    ## They aren't great benchmarks, but basic ones that proved a point.
    ## If anyone has better/contradicting results let me know
    #
    ## Some system info on which these tests were run:
    #$ cat /proc/cpuinfo | grep -i "model name" | uniq -c
    #       4 model name      : Intel(R) Core(TM) i7-3740QM CPU @ 2.70GHz
    #
    #$ free -m
    #              total        used        free      shared  buff/cache   available
    #              Mem:           1984         925         372           8         686         833
    #              Swap:          2047           0        2047
    #
    #str = '<https://dev-instance.oktapreview.com/api/v1/events?after=tevHLxinRbATJeKgKjgXGXy0Q1479278142000&limit=1000>; rel="next"'
    #require "benchmark"
    #
    #
    #n = 50000000
    #
    #
    #Benchmark.bm do |x|
    #  x.report { n.times { str.include?('rel="next"') } } # (2) 23.008853sec @50000000 times
    #  x.report { n.times { str.end_with?('rel="next"') } } # (1) 16.894623sec @50000000 times
    #  x.report { n.times { str =~ /rel="next"$/ } } # (3) 30.757554sec @50000000 times
    #end
    #
    #Benchmark.bm do |x|
    #  x.report { n.times { str.match(/<([^>]+)>/).captures[0] } } # (2) 262.166085sec @50000000 times
    #  x.report { n.times { str.split(';')[0][1...-1] } } # (1) 31.673270sec @50000000 times
    #end


    @logger.debug("Response headers", :headers => response.headers)
    @trace_log_method.call("Response body", :body => response.body)

    # Store the next URL to call from the header
    next_url = nil
    Array(response.headers["link"]).each do |link_header|
      if link_header.end_with?('rel="next"')
        next_url = link_header.split(';')[0][1...-1]
      end
    end

    if (response.body.length > 0)
      @codec.decode(response.body) do |decoded|
        @trace_log_method.call("Pushing event to queue")
        event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded
        @metadata_function.call(event, requested_url, response, exec_time)
        decorate(event)
        queue << event
      end
    else
      @codec.decode("{}") do |decoded|
        event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded
        @metadata_function.call(event, requested_url, response, exec_time)
        decorate(event)
        queue << event
      end
    end


    if (!next_url.nil? and next_url != @url)
      @url = next_url
      if (response.headers['x-rate-limit-remaining'].to_i > response.headers['x-rate-limit-limit'].to_i * @rate_limit_factor and response.headers['x-rate-limit-remaining'].to_i > 0)
        @continue = true
        @trace_log_method.call("Rate Limit Status", :remaining => response.headers['x-rate-limit-remaining'].to_i, :limit => response.headers['x-rate-limit-limit'].to_i)
      end
    end
    @logger.debug("Continue status", :continue => @continue  )


  when HTTP_UNAUTHORIZED_401
    @codec.decode(response.body) do |decoded|
      event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded
      @metadata_function.call(event, requested_url, response, exec_time)
      event.set("okta_response_error", {
        "okta_plugin_status" => "Auth_token supplied is not valid, " +
          "validate the auth_token and update the plugin config.",
        "http_code" => 401
      })
      event.tag("_okta_response_error")
      decorate(event)
      queue << event
    end

    @logger.error("Authentication required, check auth_code", 
      :code => response.code, 
      :headers => response.headers)
    @trace_log_method.call("Authentication failed body", :body => response.body)

  when HTTP_BAD_REQUEST_400
    if (response.body.include?("E0000031"))
      @codec.decode(response.body) do |decoded|
        event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded
        @metadata_function.call(event, requested_url, response, exec_time)
        event.set("okta_response_error", {
          "okta_plugin_status" => "Filter string was not valid.",
          "http_code" => 400
        })
        event.tag("_okta_response_error")
        decorate(event)
        queue << event
      end

      @logger.error("Filter string was not valid", 
        :response_code => response.code,
        :okta_error => "E0000031",
        :filter_string => @filter)

      @logger.debug("Filter string error response",
        :response_body => response.body,
        :response_headers => response.headers)

    elsif (response.body.include?("E0000030"))

      @codec.decode(response.body) do |decoded|
        event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded
        @metadata_function.call(event, requested_url, response, exec_time)
        event.set("okta_response_error", {
          "okta_plugin_status" => "since was not valid.",
          "http_code" => 400
        })
        event.tag("_okta_response_error")
        decorate(event)
        queue << event
      end

      @logger.error("Date was not formatted correctly",
        :response_code => response.code,
        :okta_error => "E0000030",
        :date_string => @since)

      @logger.debug("Start date error response",
        :response_body => response.body,
        :response_headers => response.headers)

    ## If the Okta error code does not match known codes
    ## Process it as a generic error
    else
      handle_unknown_okta_code(queue,response,requested_url,exec_time)
    end
  when HTTP_TOO_MANY_REQUESTS_429
       @codec.decode(response.body) do |decoded|
        event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded
        @metadata_function.call(event, requested_url, response, exec_time)
        event.set("okta_response_error", {
          "okta_plugin_status" => "rate limit exceeded; sleeping.",
          "http_code" => 429,
          "okta_error" => "E0000047",
          "reset_time" => response.headers['x-rate-limit-reset']
        })
        event.tag("_okta_response_error")
        decorate(event)
        queue << event
      end

      now = get_epoch
      sleep_time = (now - response.headers['x-rate-limit-reset'].to_i > 60) ? 60 : now - response.headers['x-rate-limit-reset'].to_i
      @logger.error("Rate limited exceeded",
        :response_code => response.code,
        :okta_error => "E0000047",
        :sleep_time => sleep_time,
        :reset_time => response.headers['x-rate-limit-reset'])

      @logger.debug("rate limit error response",
        :response_body => response.body,
        :response_headers => response.headers)

      # Use a local function so the test can override it
      local_sleep sleep_time
  else
    handle_unknown_http_code(queue,response,requested_url,exec_time)
  end

end
handle_unknown_http_code(queue,response,requested_url,exec_time) click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 856
def handle_unknown_http_code(queue,response,requested_url,exec_time)
  @codec.decode(response.body) do |decoded|
    event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded
    @metadata_function.call(event, requested_url, response, exec_time)

    event.set("http_response_error", {
      "okta_plugin_status" => "Unknown HTTP code, review HTTP errors",
      "http_code" => response.code,
      "http_headers" => response.headers
    })
    event.tag("_http_response_error")
    decorate(event)
    queue << event
  end

  @logger.error("HTTP Error", 
    :http_code => response.code, 
    :body => response.body,
    :headers => response.headers)
end
handle_unknown_okta_code(queue,response,requested_url,exec_time) click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 835
def handle_unknown_okta_code(queue,response,requested_url,exec_time)
  @codec.decode(response.body) do |decoded|
    event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded
    @metadata_function.call(event, requested_url, response, exec_time)
    event.set("okta_response_error", {
      "okta_plugin_status" => "Unknown error code from Okta",
      "http_code" => response.code,
    })
    event.tag("_okta_response_error")
    decorate(event)
    queue << event
  end

  @logger.error("Okta API Error", 
    :http_code => response.code, 
    :body => response.body,
    :headers => response.headers)

end
local_sleep(time) click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 831
def local_sleep(time)
  sleep time
end
log_debug(message, vars = {}) click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 1061
def log_debug(message, vars = {})
  @logger.debug(message, vars)
end
log_trace(message, vars = {}) click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 1056
def log_trace(message, vars = {})
  @logger.trace(message, vars)
end
non_atomic_write(path, content) click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 954
def non_atomic_write(path, content)
  IO.open(IO.sysopen(path, "w+")) do |io|
    io.write("#{content}\n")
  end
end
noop(*args) click to toggle source

Dummy function to handle noops

# File lib/logstash/inputs/okta_system_log.rb, line 922
def noop(*args)
  return
end
request_async(queue) click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 576
def request_async(queue)

  @continue = true

  header_hash = {
                "Accept" => "application/json",
                "Content-Type" => "application/json"
                }

  if (@auth_token)
    header_hash["Authorization"] = "SSWS #{@auth_token.value}"
  elsif (@custom_auth_header)
    header_hash["Authorization"] = @custom_auth_header.value
  end

  begin
    while @continue and !stop?
      @logger.debug("Calling URL", 
        :url => @url, 
        :token_set => !@auth_token.nil?)

      started = Time.now

      client.async.get(@url.to_s, headers: header_hash).
        on_success { |response| handle_success(queue, response, @url, Time.now - started) }.
        on_failure { |exception| handle_failure(queue, exception, @url, Time.now - started) }

      client.execute!
    end
  rescue => e
    @logger.fatal(e.inspect)
    raise e
  ensure
    update_state_file()
  end
end
run_once(queue) click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 569
def run_once(queue)

  request_async(queue)

end
update_state_file() click to toggle source
# File lib/logstash/inputs/okta_system_log.rb, line 614
def update_state_file()
  for i in 1..3
    @trace_log_method.call("Starting state file update",
      :state_file_path => @state_file_path,
      :url => @url,
      :attempt_num  => i)

    begin
      @write_method.call(@state_file_path, @url)
    rescue => e
      @logger.warn("Could not save state, retrying",
        :state_file_path => @state_file_path,
        :url => @url,
        :exception => e.inspect)

      sleep SLEEP_STATE_FILE_RETRY
      next
    end
    @logger.debug("Successfully wrote the state file",
      :state_file_path => @state_file_path,
      :url => @url,
      :attempts => i)
    # Break out of the loop once you're done
    return nil
  end
  @state_file_failure_function.call()
end
write_atomically(file_name) { |temp_file| ... } click to toggle source

Write to a file atomically. Useful for situations where you don't want other processes or threads to see half-written files.

File.write_atomically('important.file') do |file|
  file.write('hello')
end
# File lib/logstash/inputs/okta_system_log.rb, line 968
def write_atomically(file_name)

  # Create temporary file with identical permissions
  begin
    temp_file = File.new(rand_filename(file_name), "w", @state_file_stat.mode)
    temp_file.binmode
    return_val = yield temp_file
  ensure
    temp_file.close
  end

  # Overwrite original file with temp file
  File.rename(temp_file.path, file_name)

  # Unable to get permissions of the original file => return
  return return_val if @state_file_mode.nil?

  # Set correct uid/gid on new file
  File.chown(@state_file_stat.uid, @state_file_stat.gid, file_name) if old_stat

  return return_val
end