class DataOperations::Aggregate

Constants

DEFAULT_AGGREGATIONS
DEFAULT_FIELD_NO_DATA_VALUE
DEFAULT_FLUSH_INTERVAL
DEFAULT_HASH_TIME_FORMAT
DEFAULT_INERVAL_SECONDS
DEFAULT_INTERVALS
DEFAULT_OUTPUT_TIME_FORMAT
DEFAULT_PROCESSING_MODE
DEFAULT_TIME_FIELD
DEFAULT_TIME_FORMAT
DEFAULT_TIME_STARTED_MODE
VALID_AGGREGATIONS

Public Class Methods

new(aggregator: {}, time_format: DEFAULT_TIME_FORMAT, time_field: DEFAULT_TIME_FIELD, output_time_format: DEFAULT_OUTPUT_TIME_FORMAT, intervals: DEFAULT_INTERVALS, flush_interval: DEFAULT_FLUSH_INTERVAL, keep_interval: DEFAULT_KEEP_INTERVAL, field_no_data_value: DEFAULT_FIELD_NO_DATA_VALUE, processing_mode: DEFAULT_PROCESSING_MODE, time_started_mode: DEFAULT_TIME_STARTED_MODE, aggregator_name: nil, log: Logger.new(STDOUT), aggregation_names:, group_field_names:, aggregate_field_names: ) click to toggle source
# File lib/dataoperations-aggregate.rb, line 19
def initialize(aggregator: {},
               time_format: DEFAULT_TIME_FORMAT,
               time_field: DEFAULT_TIME_FIELD,
               output_time_format: DEFAULT_OUTPUT_TIME_FORMAT,
               intervals: DEFAULT_INTERVALS,
               flush_interval: DEFAULT_FLUSH_INTERVAL,
               keep_interval: DEFAULT_KEEP_INTERVAL,
               field_no_data_value: DEFAULT_FIELD_NO_DATA_VALUE,
               processing_mode: DEFAULT_PROCESSING_MODE,
               time_started_mode: DEFAULT_TIME_STARTED_MODE,
               aggregator_name: nil,
               log: Logger.new(STDOUT),
               aggregation_names:,
               group_field_names:,
               aggregate_field_names:
            )
  @aggregator = aggregator
  @time_format = time_format
  @time_field = time_field
  @output_time_format = output_time_format
  @intervals = intervals.uniq.sort!
  @flush_interval = flush_interval
  @keep_interval = keep_interval
  @field_no_data_value = field_no_data_value
  @processing_mode = processing_mode
  @time_started_mode = time_started_mode
  @aggregator_name = aggregator_name


  if aggregation_names.nil? || !aggregation_names.is_a?(Array)
    raise 'Configuration error, aggregation_names must be specified and Array'
  end
  if group_field_names.nil? || !aggregation_names.is_a?(Array)
    raise 'Configuration error, group_field_names must be specified and Array'
  end
  if aggregate_field_names.nil? || !aggregation_names.is_a?(Array)
    raise 'Configuration error, aggregate_field_names must be specified and Array'
  end

  @log = log

  @hash_time_format = DEFAULT_HASH_TIME_FORMAT
  @interval_seconds = DEFAULT_INERVAL_SECONDS

  @aggregation_names = aggregation_names
  @group_field_names = group_field_names
  @aggregate_field_names = aggregate_field_names

  @aggregation_names.each do |operation|
    unless VALID_AGGREGATIONS.include?(operation)
      raise 'aggregations must set any combination of sum,min,max,mean,median,variance,standard_deviation'
    end
  end
  @intervals.each do |interval|
    unless (interval % @intervals[0]).zero?
      raise "interval: #{interval} must be multiple of first interval: #{@intervals[0]}"
    end
  end

  @aggregator_mutex = Mutex.new
  # TODO:
  # - Duplicate intervals - Done
  # - Sort intervals - Done
  # - Validate aggregation_names, group_field_names, aggregate_field_names
end

Public Instance Methods

add_events(record) click to toggle source
# File lib/dataoperations-aggregate.rb, line 89
def add_events(record)
  timestamp = nil
  if !record.key?(@time_field) || !(timestamp = DateTime.strptime(record[@time_field], @time_format).to_time.to_i)
    timestamp = DateTime.now.to_time.to_i
  end

  current_interval_seconds = (timestamp / @intervals[0]) * @intervals[0]
  aggregator_hash_key = current_interval_seconds

  hash_group_key = nil
  @group_field_names.each do |field_name|
    hash_group_key = !hash_group_key.nil? ? "#{hash_group_key}_#{field_name}:#{record[field_name]}" : "#{field_name}:#{record[field_name]}"
  end

  aggregator_item = {}
  if @aggregator.key?(hash_group_key)
    aggregator_item = @aggregator[hash_group_key]
  else
    group_detail = {}
    aggregate_detail = {}
    interval_detail = {}
    @group_field_names.each do |field_name|
      group_detail[field_name] = record.key?(field_name) ? record[field_name] : @field_no_data_value
    end

    # Add interval empty data
    @intervals.each do |interval|
      interval_detail[interval.to_s] = {}
    end

    aggregator_item['group_fields'] = group_detail
    aggregator_item['aggregate_fields'] = aggregate_detail
    aggregator_item['intervals'] = interval_detail

    @aggregator_mutex.synchronize {@aggregator[hash_group_key] = aggregator_item}
  end

  if !aggregator_item['aggregate_fields'].key?(aggregator_hash_key)
    hash_aggregator = {}
    hash_aggregator[:time_started] = Time.now.to_i
    hash_aggregator['processed'] = 1
    @aggregator_mutex.synchronize {aggregator_item['aggregate_fields'][aggregator_hash_key] = hash_aggregator}
  else
    aggregator_item['aggregate_fields'][aggregator_hash_key]['processed'] += 1
  end

  @aggregate_field_names.each do |field_name|
    aggregate_values = []
    if aggregator_item['aggregate_fields'][aggregator_hash_key].key?(field_name)
      aggregate_values = aggregator_item['aggregate_fields'][aggregator_hash_key][field_name]
    end
    if record[field_name].is_a?(Integer) || record[field_name].is_a?(Float)
      aggregate_values << record[field_name]
    else
      aggregate_values << 0
    end
    aggregator_item['aggregate_fields'][aggregator_hash_key][field_name] = aggregate_values
  end
end
aggregate_data() click to toggle source
# File lib/dataoperations-aggregate.rb, line 149
def aggregate_data
  @aggregator
end
aggregate_events() click to toggle source
# File lib/dataoperations-aggregate.rb, line 153
def aggregate_events
  aggregate_data = {}

  # @log.debug @aggregator
  @aggregator_mutex.synchronize do
    current_time = Time.now.to_i
    @aggregator.each do |group_item_key, group_item_value|
      aggregate_first_interval(aggregate_data, current_time, group_item_value)

      # Calculate subsecuents aggregations
      group_item_value['intervals'].keys[1..-1].each do |s_interval|
        aggregate_subsequents_intervals(aggregate_data, current_time, group_item_value, s_interval)
      end
    end
  end

  # @log.debug aggregate_data
  aggregate_data unless aggregate_data.empty?
  # rescue Exception => e
  #  $log.error e
end
log_level(log_level) click to toggle source
# File lib/dataoperations-aggregate.rb, line 85
def log_level(log_level)
  @log.level = log_level
end

Private Instance Methods

acumulative_aggregation(aggregate_data, aggregator_item_key, aggregator_item_value, current_time, group_item_value, s_interval) click to toggle source
# File lib/dataoperations-aggregate.rb, line 278
def acumulative_aggregation(aggregate_data, aggregator_item_key, aggregator_item_value, current_time, group_item_value, s_interval)
  interval = s_interval.to_i
  # If processing mode is :batch, aggregate immediatly, else wait to arrive events (streaming processing like fluentd)
  limit_time = @processing_mode == :batch ? 0 : aggregator_item_value[:time_started] + interval + @keep_interval

  # @log.debug "processing_mode:#{@processing_mode} limit_time:#{limit_time}"

  unless current_time < limit_time
    aggregator_data = {}
    aggregator_data[@time_field] = Time.at(aggregator_item_key).strftime(@output_time_format)
    aggregator_data.merge!(group_item_value['group_fields'])

    aggregator_data['time'] = aggregator_item_key
    aggregator_data['processed'] = aggregator_item_value['processed']
    if @aggregator_name
      aggregator_data['aggregator_id'] = @aggregator_name
    end
    aggregator_item_value['aggregate_fields'].each do |field_name, field_data|
      field_data.each do |operation, vector|
        case operation
        when 'max', 'min', 'mean', 'median'
          data = vector.method(operation).call
        else
          data = vector.median
        end
        aggregator_data["#{field_name}_#{operation}"] = data
      end
    end
    # @log.debug aggregator_item_value
    # @log.debug aggregator_data
    group_item_value['intervals'][s_interval].delete(aggregator_item_key)
    aggregate_data[s_interval] = [] if aggregate_data[s_interval].nil?
    aggregate_data[s_interval] << aggregator_data
  end
end
aggregate_first_interval(aggregate_data, current_time, group_item_value) click to toggle source
# File lib/dataoperations-aggregate.rb, line 177
def aggregate_first_interval(aggregate_data, current_time, group_item_value)
  group_item_value['aggregate_fields'].each do |aggregator_item_key, aggregator_item_value|
    # If processing mode is :batch, aggregate immediatly, else wait to arrive events (streaming processing like fluentd)
    @processing_mode == :batch ? limit_time = 0 : limit_time = aggregator_item_value[:time_started] + @intervals[0] + @keep_interval

    # Is this data ready to aggregate (based on the ingest time), if @processing_mode is batch limit_time is 0
    next unless current_time >= limit_time

    aggregator_data = {}
    aggregator_data[@time_field] = Time.at(aggregator_item_key).strftime(@output_time_format)
    aggregator_data.merge!(group_item_value['group_fields'])

    aggregator_data['time'] = aggregator_item_key
    aggregator_data['processed'] = aggregator_item_value['processed']
    if @aggregator_name
      aggregator_data['aggregator_id'] = @aggregator_name
    end

    # Add entry in accumulative aggregation hash
    group_item_value['intervals'].keys[1..-1].each do |interval_secs|
      create_aggregation_hash(aggregator_item_key, aggregator_item_value, group_item_value, interval_secs)
    end

    aggregator_item_value.each do |aggregate_field_key, aggregate_field_value|
      execute_aggregation(aggregate_field_key, aggregate_field_value, aggregator_data, aggregator_item_key, group_item_value)
    end

    group_item_value['aggregate_fields'].delete(aggregator_item_key)
    if aggregate_data[group_item_value['intervals'].keys[0]].nil?
      aggregate_data[group_item_value['intervals'].keys[0]] = []
    end
    aggregate_data[group_item_value['intervals'].keys[0]] << aggregator_data
  end
end
aggregate_subsequents_intervals(aggregate_data, current_time, group_item_value, s_interval) click to toggle source
# File lib/dataoperations-aggregate.rb, line 272
def aggregate_subsequents_intervals(aggregate_data, current_time, group_item_value, s_interval)
  group_item_value['intervals'][s_interval].each do |aggregator_item_key, aggregator_item_value|
    acumulative_aggregation(aggregate_data, aggregator_item_key, aggregator_item_value, current_time, group_item_value, s_interval)
  end
end
create_aggregation_hash(aggregator_item_key, aggregator_item_value, group_item_value, interval_secs) click to toggle source
# File lib/dataoperations-aggregate.rb, line 250
def create_aggregation_hash(aggregator_item_key, aggregator_item_value, group_item_value, interval_secs)
  interval_aggregator_item_key = (aggregator_item_key / interval_secs.to_i) * interval_secs.to_i
  # @log.debug  "interval_aggregator_item_key: #{interval_aggregator_item_key}"

  if interval_aggregator_item_value = group_item_value['intervals'][interval_secs][interval_aggregator_item_key]
    if @time_started_mode == :first_event && aggregator_item_value[:time_started] < interval_aggregator_item_value[:time_started]
      interval_aggregator_item_value[:time_started] = aggregator_item_value[:time_started]
    elseif @time_started_mode == :last_event && aggregator_item_value[:time_started] > interval_aggregator_item_value[:time_started]
      interval_aggregator_item_value[:time_started] = aggregator_item_value[:time_started]
    end
    interval_aggregator_item_value['processed'] += aggregator_item_value['processed']
    # @log.debug interval_aggregator_item_value
  else
    interval_aggregator_item_value = {}
    interval_aggregator_item_value[:time_started] = aggregator_item_value[:time_started]
    interval_aggregator_item_value['aggregate_fields'] = {}
    interval_aggregator_item_value['processed'] = aggregator_item_value['processed']
    group_item_value['intervals'][interval_secs][interval_aggregator_item_key] = interval_aggregator_item_value
    # @log.debug interval_aggregator_item_value
  end
end
create_metadata_aggregation(aggregate_field_key, aggregate_field_value, aggregator_data, aggregator_item_key, group_item_value) click to toggle source
# File lib/dataoperations-aggregate.rb, line 235
def create_metadata_aggregation(aggregate_field_key, aggregate_field_value, aggregator_data, aggregator_item_key, group_item_value)
  group_item_value['intervals'].keys[1..-1].each do |interval_secs|
    interval_aggregator_item_key = (aggregator_item_key / interval_secs.to_i) * interval_secs.to_i
    interval_aggregator_item_value = group_item_value['intervals'][interval_secs][interval_aggregator_item_key]

    # @log.debug interval_aggregator_item_value
    next unless !interval_aggregator_item_value['aggregate_fields'].key?(aggregate_field_key) && aggregate_field_value.is_a?(Array)

    interval_aggregator_item_value['aggregate_fields'][aggregate_field_key] = {}
    @aggregation_names.each do |operation|
      interval_aggregator_item_value['aggregate_fields'][aggregate_field_key][operation] = []
    end
  end
end
execute_aggregation(aggregate_field_key, aggregate_field_value, aggregator_data, aggregator_item_key, group_item_value) click to toggle source
# File lib/dataoperations-aggregate.rb, line 212
def execute_aggregation(aggregate_field_key, aggregate_field_value, aggregator_data, aggregator_item_key, group_item_value)
  # Create field metadata for subsecuents aggregations
  create_metadata_aggregation(aggregate_field_key,
                              aggregate_field_value,
                              aggregator_data,
                              aggregator_item_key,
                              group_item_value)
  # Aggregate data
  if aggregate_field_value.is_a?(Array)
    @aggregation_names.each do |operation|
      data = aggregate_field_value.method(operation).call
      aggregator_data["#{aggregate_field_key}_#{operation}"] = data

      # Add aggregated data to interval
      group_item_value['intervals'].keys[1..-1].each do |interval_secs|
        interval_aggregator_item_key = (aggregator_item_key / interval_secs.to_i) * interval_secs.to_i
        interval_aggregator_item_value = group_item_value['intervals'][interval_secs][interval_aggregator_item_key]
        interval_aggregator_item_value['aggregate_fields'][aggregate_field_key][operation] << data
      end
    end
  end
end