class DuplicateDetector

Public Class Methods

new(logger, past_queries_count) click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 292
def initialize(logger, past_queries_count)
  @logger = logger
  @past_queries_count = past_queries_count
  @query_cache = []
end

Public Instance Methods

filter_duplicates(query, on_new_item_ckb, should_cache_query = true) click to toggle source
# File lib/logstash/inputs/azurewadtable.rb, line 298
def filter_duplicates(query, on_new_item_ckb, should_cache_query = true)
  if query.nil?
    @logger.debug("query is nil")
    return false
  end
  #push in front, pop from the back
  latest_query = QueryData.new(@logger, query)
  @query_cache.insert(0, latest_query)

  found_new_items = false

  # results is most likely empty or has very few items for older queries (most or all should be de-duplicated by run_query)
  index = 0
  @query_cache.each do |query_data|
      query_data.run_query(->(entity) {
      unique_id = query_data.get_unique_id(entity)

      # queries overlap. Check for duplicates in all results
      is_duplicate = false
      for j in 0..@query_cache.length - 1
        if j == index
          next
        end
        q = @query_cache[j]
        if q.has_entity(entity)
          @logger.debug("[#{query_data.id}][filter_duplicates] #{unique_id} was already processed by #{q.id}")
          is_duplicate = true
          break
        end
      end

      if !is_duplicate
        found_new_items = true
        @logger.debug("[#{query_data.id}][filter_duplicates] #{unique_id} new item")
        on_new_item_ckb.call(entity)
      end

    })

    index+=1
  end

  if !should_cache_query
    @logger.debug("Removing first item from queue")
    @query_cache.shift
  end

  @logger.debug("Query Cache length: #{@query_cache.length}")
  until @query_cache.length <= @past_queries_count do
    @query_cache.pop
    @logger.debug("New Query Cache length: #{@query_cache.length}")
  end

  return found_new_items
end