class LogStash::Filters::Empow::Classifier
Constants
- BATCH_TIMEOUT
- MAX_CONCURRENT_REQUESTS
Public Class Methods
new(online_classifer, local_classifier, online_classification_workers, batch_size, batch_interval, max_retries, time_between_queries)
click to toggle source
# File lib/logstash/filters/classifier.rb, line 16 def initialize(online_classifer, local_classifier, online_classification_workers, batch_size, batch_interval, max_retries, time_between_queries) @logger ||= self.logger @logger.info("initializing classifier") @local_classifier = local_classifier @online_classifer = online_classifer @batch_interval = batch_interval @time_between_queries = time_between_queries @inflight_requests = Concurrent::Hash.new @new_request_queue = java.util.concurrent.ArrayBlockingQueue.new(MAX_CONCURRENT_REQUESTS) @bulk_processor = Classification::BulkProcessor.new(max_retries, batch_size, time_between_queries, @inflight_requests, online_classifer, local_classifier, online_classification_workers) @worker_pool = Concurrent::FixedThreadPool.new(1) @worker_pool.post do while @worker_pool.running? do begin management_task() rescue StandardError => e @logger.error("encountered an error while running the management task", :error => e, :backtrace => e.backtrace) end end end @logger.debug("classifier initialized") @last_action_time = Time.now end
Public Instance Methods
classify(request)
click to toggle source
# File lib/logstash/filters/classifier.rb, line 97 def classify(request) return nil if request.nil? res = @local_classifier.classify(request) @logger.trace("cached result", :request => request, :res => res) return res if !res.nil? request_online_classifiction(request) return nil end
close()
click to toggle source
# File lib/logstash/filters/classifier.rb, line 48 def close @logger.info("shutting down empow's classifcation plugin") @inflight_requests.clear() @bulk_processor.close @worker_pool.kill() @worker_pool.wait_for_termination(5) @logger.info("empow classifcation plugin closed") end
Private Instance Methods
create_task(request)
click to toggle source
# File lib/logstash/filters/classifier.rb, line 130 def create_task(request) tuple = {} tuple[:retries] = 0 tuple[:request] = request tuple[:last_executed] = Time.at(310953600) return tuple end
management_task()
click to toggle source
# File lib/logstash/filters/classifier.rb, line 62 def management_task begin current_time = Time.now diff = (current_time - @bulk_processor.get_last_execution_time()).round sleep_time = @batch_interval - diff sleep_time = 0 if sleep_time < 0 # in case the rounding caused the number to be smaller than zero dequeued_request = nil begin dequeued_request = @new_request_queue.poll(sleep_time, TimeUnit::SECONDS) rescue java.lang.InterruptedException => e end # if this is a 'tick' if dequeued_request.nil? @bulk_processor.flush_current_batch else @bulk_processor.add_to_batch(dequeued_request) end # skip the 'tick' if the timer hasn't expired return if current_time - @last_action_time < @time_between_queries @last_action_time = current_time @bulk_processor.retry_queued_requests() rescue StandardError => e @logger.error("encountered an error while running the management task", :error => e, :backtrace => e.backtrace) end end
request_online_classifiction(req)
click to toggle source
# File lib/logstash/filters/classifier.rb, line 112 def request_online_classifiction(req) existing_request = @inflight_requests[req] return if !existing_request.nil? # request already handled by a worker @logger.debug("adding request to online classification queue", :request => req) task = create_task(req) # mark request as in progress @inflight_requests[req] = task res = @new_request_queue.offer(req) @logger.warn("queue full, request reject", :request => req) if !res end