class LogStash::Inputs::SfdcElf
This plugin enables Salesforce customers to load EventLogFile(ELF) data from their Force.com orgs. The plugin will handle downloading ELF CSV file, parsing them, and handling any schema changes transparently.
Constants
- LOG_KEY
- RETRY_ATTEMPTS
Public Instance Methods
register()
click to toggle source
The first part of logstash pipeline is register, where all instance variables are initialized.
# File lib/logstash/inputs/sfdc_elf.rb, line 53 def register # Initialize the client. @client = ClientWithStreamingSupport.new @client.client_id = @client_id.value @client.client_secret = @client_secret.value @client.host = @host @client.version = '46.0' # Authenticate the client @logger.info("#{LOG_KEY}: tyring to authenticate client") @client.retryable_authenticate(username: @username, password: @password.value + @security_token.value, retry_attempts: RETRY_ATTEMPTS) @logger.info("#{LOG_KEY}: authenticating succeeded") # Save org id to distinguish between multiple orgs. @org_id = @client.query('select id from Organization')[0]['Id'] # Set up time interval for forever while loop. @poll_interval_in_seconds = @poll_interval_in_minutes * 60 # Handel the @path config passed by the user. If path does not exist then set @path to home directory. verify_path # Handel parsing the data into event objects and enqueue it to the queue. @queue_util = QueueUtil.new # Handel when to schedule the next process based on the @poll_interval_in_hours config. @scheduler = Scheduler.new(@poll_interval_in_seconds) # Handel state of the plugin based on the read and writes of LogDates to the .sdfc_info_logstash file. @state_persistor = StatePersistor.new(@path, @org_id) # Grab the last indexed log date. @last_indexed_log_date = @state_persistor.get_last_indexed_log_date @logger.info("#{LOG_KEY}: @last_indexed_log_date = #{@last_indexed_log_date}") end
run(queue)
click to toggle source
The second stage of Logstash pipeline is run, where it expects to parse your data into event objects and then pass it into the queue to be used in the rest of the pipeline.
# File lib/logstash/inputs/sfdc_elf.rb, line 98 def run(queue) @scheduler.schedule do # Line for readable log statements. @logger.info('---------------------------------------------------') # Grab a list of SObjects, specifically EventLogFiles. soql_expr= "SELECT Id, EventType, Logfile, LogDate, LogFileLength, LogFileFieldTypes FROM EventLogFile WHERE LogDate > #{@last_indexed_log_date} and EventType in (#{@eventtypesstring}) and Sequence>0 and Interval='Hourly' ORDER BY LogDate ASC" query_result_list = @client.retryable_query(username: @username, password: @password.value + @security_token.value, retry_attempts: RETRY_ATTEMPTS, soql_expr: soql_expr) @logger.info("#{LOG_KEY}: query result size = #{query_result_list.size}") if !query_result_list.empty? # query_result_list is in ascending order based on the LogDate, so grab the last one of the list and save the # LogDate to @last_read_log_date and .sfdc_info_logstash @last_indexed_log_date = query_result_list.last.LogDate.strftime('%FT%T.%LZ') # TODO: grab tempfiles here!! # Overwrite the .sfdc_info_logstash file with the @last_read_log_date. # Note: we currently do not support deduplication, but will implement it soon. # TODO: need to implement deduplication # TODO: might have to move this after enqueue_events(), in case of a crash in between. # TODO: can do all @state_persistor calls after the if statement @state_persistor.update_last_indexed_log_date(@last_indexed_log_date) # Creates events from query_result_list, then simply append the events to the queue. @queue_util.enqueue_events(query_result_list, queue, @client) end end # do loop end
Private Instance Methods
verify_path()
click to toggle source
Handel the @path variable passed by the user. If path does not exist then set @path to home directory.
# File lib/logstash/inputs/sfdc_elf.rb, line 142 def verify_path # Check if the path exist, if not then set @path to home directory. unless File.directory?(@path) @logger.warn("#{LOG_KEY}: provided path does not exist or is invalid. path=#{@path}") @path = Dir.home end @logger.info("#{LOG_KEY}: path = #{@path}") end