class Object
Constants
- EVENT_TRACK_TYPE_SET
- TA_PRESET_COLUMN_MAP
Public Instance Methods
filter(event)
click to toggle source
# File lib/logstash/filter/sensors/sensors_data.rb, line 32 def filter(event) #在这里处理业务数据,如果没有进行grok等一系列处理的情况下,直接在message中获取元数据进行处理 begin _message = event.get('message') #message 是你的上传的每条日志 sensors_data = JSON.parse(_message) distinct_id = sensors_data['distinct_id'] original_id = sensors_data['original_id'] type = sensors_data['type'] time = sensors_data['time'] event_name = sensors_data['event'] properties = sensors_data['properties'] if properties.nil? properties = {} end data = TaDataDO.new if time.nil? time = Time.now.strftime('%Y-%m-%d %H:%M:%S') else time = Time.at(time / 1000).strftime('%Y-%m-%d %H:%M:%S') end data.time = time if (type == 'track' && event_name == '$SignUp') || type == 'track_signup' data.account_id = distinct_id data.distinct_id = properties['$track_signup_original_id'] if distinct_id == original_id puts 'original_id error:' + _message + "\n" end elsif type == 'track' || type.index('profile_') == 0 is_login_id = properties['$is_login_id'] if is_login_id data.account_id = distinct_id if distinct_id != original_id data.distinct_id = original_id end else data.distinct_id = distinct_id end else puts 'not recognized type: ' + _message + "\n" end if type == 'track' || type == 'track_signup' event_name = event_name.gsub('$', '') ip = properties['$ip'] if ip data.ip = ip end data.type = 'track' data.event_name = event_name elsif type == 'profile_set' data.type = 'user_set' elsif type == 'profile_increment' data.type = 'user_add' elsif type == 'profile_delete' data.type = 'user_del' elsif type == 'profile_unset' data.type = 'user_unset' elsif type == 'profile_set_once' data.type = 'user_setOnce' else puts '暂不支持的type: ' + type + "\n" end properties_json = {} properties.each do |key, value| if TA_PRESET_COLUMN_MAP.has_key?(key) && EVENT_TRACK_TYPE_SET.include?(type) properties_json[TA_PRESET_COLUMN_MAP[key]] = value else properties_json[key.gsub('$', '')] = value end end data.properties = properties_json event.set('message', data.to_json) return [event] rescue Exception => e return [event] end end