module SourMix::Perform
Constants
- PAGE_SIZE
Public Instance Methods
perform()
click to toggle source
# File lib/sourmix/perform.rb, line 18 def perform mixpanel = Mixpanel::Client.new \ api_key: options.mixpanel_api_key, api_secret: options.mixpanel_api_secret dataset = download_dataset mixpanel results = process_dataset dataset gzip_results results report_results results end
Private Instance Methods
download_dataset(mixpanel)
click to toggle source
# File lib/sourmix/perform.rb, line 32 def download_dataset mixpanel log.debug event: :download_dataset dataset = Tempfile.new 'dataset' request_uri = mixpanel.request_uri 'export', \ from_date: options.date, to_date: options.date download_dataset = 'curl -Lf -o %{out} %{url}' % { url: Shellwords.escape(request_uri), out: Shellwords.escape(dataset.path) } _, elapsed_s = timed { sh download_dataset } log.info event: :download_dataset, exited: $?.exitstatus, elapsed_s: elapsed_s dataset end
gzip_results(results)
click to toggle source
# File lib/sourmix/perform.rb, line 103 def gzip_results results log.debug event: :gzip_results started, total_size = Time.now.to_f, 0 results.map do |page| Thread.new do gzip_result = 'gzip -k %s' % Shellwords.escape(page.path) _, elapsed_s = timed { sh gzip_result } size = File::Stat.new(page.path + '.gz').size total_size += size log.debug event: :gzip_result, exited: $?.exitstatus, elapsed_s: elapsed_s, size: size end end.map(&:join) log.info event: :gzip_result, total_elapsed_s: (Time.now.to_f - started), total_size: total_size end
normalize_event(e)
click to toggle source
# File lib/sourmix/perform.rb, line 159 def normalize_event e digest = Digest::MD5.hexdigest e.to_s e.merge! e.delete('properties') e = e.inject({}) do |h,(k,v)| k = k.gsub('.','_').gsub(/^\W+/,'') h[k] = v unless v.nil? || v == '' || v == [] || v == {} h end e['@timestamp'] = Time.at(e['time']).utc.iso8601(3) e['@id'] = digest e['type'] = 'mixpanel' e end
process_dataset(dataset)
click to toggle source
# File lib/sourmix/perform.rb, line 53 def process_dataset dataset log.debug event: :process_dataset results = [] nok, nerr = 0, 0 _, elapsed_s = timed do dataset.each_line do |e| if nok % PAGE_SIZE == 0 log.debug event: :process_dataset_page, okay: nok results << Tempfile.new('results') end begin event = JSON.generate(normalize_event(JSON.parse(e))) if v2_api? results.last.puts ':%s' % event else results.last.puts event end nok += 1 rescue => e log.error event: :process_dataset_error, error: e.inspect, line: e nerr += 1 end end end dataset.close dataset.unlink results.map(&:close) ops = 1.0 * (nok + nerr) / elapsed_s info = { event: :process_dataset, elapsed_s: elapsed_s, ops: ops, pages: results.size } info[:okay] = nok unless nok.zero? info[:errors] = nerr unless nerr.zero? log.info info results end
report_results(results)
click to toggle source
# File lib/sourmix/perform.rb, line 123 def report_results results log.debug event: :report_results theon_uri = URI options.theon_uri theon_auth = [ theon_uri.user, theon_uri.password ].join(':') theon_url = '%s://%s:%s%s' % [ theon_uri.scheme, theon_uri.host, theon_uri.port, theon_uri.path ] auth_opt = theon_auth.nil? ? nil : '-u %s' % Shellwords.escape(theon_auth) started = Time.now.to_f results.map do |page| Thread.new do report_result = \ 'curl %{auth} -Lf %{url} -XPOST %{headers} --data-binary @%{out}.gz' % { auth: auth_opt, url: Shellwords.escape(theon_url), out: Shellwords.escape(page.path), headers: "-H 'Content-Encoding: gzip'" } _, elapsed_s = timed { sh report_result } page.unlink log.debug event: :report_result, exited: $?.exitstatus, elapsed_s: elapsed_s end end.map(&:join) log.info event: :report_results, total_elapsed_s: (Time.now.to_f - started) end
sh(command)
click to toggle source
# File lib/sourmix/perform.rb, line 174 def sh command output = if options.debug $stderr.puts command system "#{command} 1>&2" else `#{command} 1>&2` end unless $?.exitstatus.zero? log.fatal \ error: 'Command failed', command: command, status: $?.exitstatus, output: output raise 'Command "%s" failed' % command end output end
timed() { || ... }
click to toggle source
# File lib/sourmix/perform.rb, line 195 def timed &block started = Time.now.to_f rvalue = yield elapsed = Time.now.to_f - started return rvalue, elapsed end
v2_api?()
click to toggle source
# File lib/sourmix/perform.rb, line 156 def v2_api? ; options.theon_uri.include? '/v2/' end