class Object

Constants

EVENT_TRACK_TYPE_SET
TA_PRESET_COLUMN_MAP

Public Instance Methods

filter(event) click to toggle source
# File lib/logstash/filter/sensors/sensors_data.rb, line 32
def filter(event)
  #在这里处理业务数据,如果没有进行grok等一系列处理的情况下,直接在message中获取元数据进行处理
  begin
    _message = event.get('message') #message 是你的上传的每条日志
    sensors_data = JSON.parse(_message)

    distinct_id = sensors_data['distinct_id']
    original_id = sensors_data['original_id']
    type = sensors_data['type']
    time = sensors_data['time']
    event_name = sensors_data['event']
    properties = sensors_data['properties']

    if properties.nil?
      properties = {}
    end

    data = TaDataDO.new

    if time.nil?
      time = Time.now.strftime('%Y-%m-%d %H:%M:%S')
    else
      time = Time.at(time / 1000).strftime('%Y-%m-%d %H:%M:%S')
    end
    data.time = time

    if (type == 'track' && event_name == '$SignUp') || type == 'track_signup'
      data.account_id = distinct_id
      data.distinct_id = properties['$track_signup_original_id']
      if distinct_id == original_id
        puts 'original_id error:' + _message + "\n"
      end
    elsif type == 'track' || type.index('profile_') == 0
      is_login_id = properties['$is_login_id']
      if is_login_id
        data.account_id = distinct_id
        if distinct_id != original_id
          data.distinct_id = original_id
        end
      else
        data.distinct_id = distinct_id
      end
    else
      puts 'not recognized type: ' + _message + "\n"
    end

    if type == 'track' || type == 'track_signup'
      event_name = event_name.gsub('$', '')
      ip = properties['$ip']
      if ip
        data.ip = ip
      end
      data.type = 'track'
      data.event_name = event_name
    elsif type == 'profile_set'
      data.type = 'user_set'
    elsif type == 'profile_increment'
      data.type = 'user_add'
    elsif type == 'profile_delete'
      data.type = 'user_del'
    elsif type == 'profile_unset'
      data.type = 'user_unset'
    elsif type == 'profile_set_once'
      data.type = 'user_setOnce'
    else
      puts '暂不支持的type: ' + type + "\n"
    end

    properties_json = {}
    properties.each do |key, value|
      if TA_PRESET_COLUMN_MAP.has_key?(key) && EVENT_TRACK_TYPE_SET.include?(type)
        properties_json[TA_PRESET_COLUMN_MAP[key]] = value
      else
        properties_json[key.gsub('$', '')] = value
      end
    end
    data.properties = properties_json

    event.set('message', data.to_json)

    return [event]
  rescue Exception => e
    return [event]
  end
end