class OneWorker
Sidekiq worker class
Constants
- IGNORED_NETWORKS
Public Instance Methods
Prepare data that are common for all the output types
# File lib/one_worker.rb, line 30 def common_data data = {} data['oneacct_export_version'] = ::OneacctExporter::VERSION data end
# File lib/one_worker.rb, line 78 def create_cluster_map(oda) logger.debug('Creating cluster map.') create_map(OpenNebula::ClusterPool, 'TEMPLATE/APEL_SITE_NAME', oda) end
Create mapping of image ID and specified element
@return [Hash] created map
# File lib/one_worker.rb, line 73 def create_image_map(oda) logger.debug('Creating image map.') create_map(OpenNebula::ImagePool, 'TEMPLATE/VMCATCHER_EVENT_AD_MPURI', oda) end
Generic method for mapping creation
# File lib/one_worker.rb, line 84 def create_map(pool_type, mapping, oda) oda.mapping(pool_type, mapping) rescue => e msg = "Couldn't create map: #{e.message}. "\ 'Stopping to avoid malformed records.' logger.error(msg) raise msg end
Create mapping of user ID and specified element
@return [Hash] created map
# File lib/one_worker.rb, line 65 def create_user_map(oda) logger.debug('Creating user map.') create_map(OpenNebula::UserPool, 'TEMPLATE/X509_DN', oda) end
Returns an array of disk records from vm
@param [OpenNebula::VirtualMachine] vm virtual machine
@return [Array] array of hashes representing vm's disk records
# File lib/one_worker.rb, line 171 def disk_records(vm) disks = [] vm.each 'TEMPLATE/DISK' do |d| disk = {} disk['size'] = d['SIZE'] disks << disk end disks end
Returns an array of history records from vm
@param [OpenNebula::VirtualMachine] vm virtual machine
@return [Array] array of hashes representing vm's history records
# File lib/one_worker.rb, line 149 def history_records(vm) history = [] vm.each 'HISTORY_RECORDS/HISTORY' do |h| history_record = {} history_record['start_time'] = h['STIME'] history_record['end_time'] = h['ETIME'] history_record['rstart_time'] = h['RSTIME'] history_record['rend_time'] = h['RETIME'] history_record['seq'] = h['SEQ'] history_record['hostname'] = h['HOSTNAME'] history << history_record end history end
Load virtual machine with specified ID
@return [OpenNebula::VirtualMachine] virtual machine
# File lib/one_worker.rb, line 96 def load_vm(vm_id, oda) oda.vm(vm_id) rescue => e logger.error("Couldn't retrieve data for vm with id: #{vm_id}. #{e.message}. Skipping.") return nil end
Look for 'os_tpl' OCCI mixin to better identify virtual machine's image
@param [OpenNebula::VirtualMachine] vm virtual machine
@return [NilClass, String] if found, mixin identifying string, nil otherwise
# File lib/one_worker.rb, line 204 def mixin(vm) mixin_locations = %w(USER_TEMPLATE/OCCI_COMPUTE_MIXINS USER_TEMPLATE/OCCI_MIXIN TEMPLATE/OCCI_MIXIN) mixin_locations.each do |mixin_location| vm.each mixin_location do |mixin| mixin = mixin.text.split mixin.select! { |line| line.include? '/occi/infrastructure/os_tpl#' } return mixin.first unless mixin.empty? end end nil # nothing found end
Returns number of unique public ip addresses of vm
@param [OpenNebula::VirtualMachine] vm virtual machine
@return [Integer] number of unique public ip addresses represented by integer
# File lib/one_worker.rb, line 188 def number_of_public_ips(vm) all_ips = [] vm.each 'TEMPLATE/NIC' do |nic| nic.each 'IP' do |ip| all_ips << ip.text if ip_public?(ip) end end all_ips.uniq.length end
Prepare data that are specific for output type and common for every virtual machine
# File lib/one_worker.rb, line 38 def output_type_specific_data data = {} if PBS_OT.include?(Settings.output['output_type']) && Settings.output['pbs'] data['realm'] = Settings.output.pbs['realm'] data['pbs_queue'] = Settings.output.pbs['queue'] data['scratch_type'] = Settings.output.pbs['scratch_type'] data['host'] = Settings.output.pbs['host_identifier'] end if APEL_OT.include?(Settings.output['output_type']) data['endpoint'] = Settings.output.apel['endpoint'].chomp('/') data['site_name'] = Settings.output.apel['site_name'] data['cloud_type'] = Settings.output.apel['cloud_type'] data['cloud_compute_service'] = Settings.output.apel['cloud_compute_service'] end if LOGSTASH_OT.include?(Settings.output['output_type']) data['host'] = Settings.output.logstash['host'] data['port'] = Settings.output.logstash['port'] end data end
Sidekiq specific method, specifies the purpose of the worker
@param [String] vms IDs of virtual machines to process in form of numbers separated by '|' (easier for cooperation with redis) @param [String] file_number number of the output file
# File lib/one_worker.rb, line 223 def perform(vms, file_number) OneacctExporter::Log.setup_log_level(logger) vms = vms.split('|') oda = OneDataAccessor.new(false, logger) user_map = create_user_map(oda) image_map = create_image_map(oda) cluster_map = create_cluster_map(oda) benchmark_map = oda.benchmark_map data = [] vms.each do |vm_id| vm = load_vm(vm_id, oda) next unless vm begin logger.debug("Processing vm with id: #{vm_id}.") vm_data = process_vm(vm, user_map, image_map, cluster_map, benchmark_map) validator = DataValidators::ApelDataValidator.new(logger) if APEL_OT.include?(Settings.output['output_type']) validator = DataValidators::PbsDataValidator.new(logger) if PBS_OT.include?(Settings.output['output_type']) validator = DataValidators::LogstashDataValidator.new(logger) if LOGSTASH_OT.include?(Settings.output['output_type']) vm_data = validator.validate_data(vm_data) if validator rescue Errors::ValidationError => e logger.error("Error occured during processing of vm with id: #{vm_id}. #{e.message}") next end logger.debug("Adding vm with data: #{vm_data} for export.") data << vm_data end write_data(data, file_number) unless data.empty? end
Obtain and parse required data from vm
@return [Hash] required data from virtual machine
# File lib/one_worker.rb, line 106 def process_vm(vm, user_map, image_map, cluster_map, benchmark_map) data = common_data data.merge! output_type_specific_data data['vm_uuid'] = vm['ID'] data['start_time'] = vm['STIME'] data['end_time'] = vm['ETIME'] data['machine_name'] = vm['DEPLOY_ID'] data['user_id'] = vm['UID'] data['group_id'] = vm['GID'] data['user_dn'] = vm['USER_TEMPLATE/USER_X509_DN'] data['user_dn'] ||= user_map[data['user_id']] data['user_name'] = vm['UNAME'] data['group_name'] = vm['GNAME'] data['status_code'] = vm['STATE'] data['status'] = vm.state_str data['cpu_count'] = vm['TEMPLATE/VCPU'] data['network_inbound'] = vm['MONITORING/NETTX'] data['network_outbound'] = vm['MONITORING/NETRX'] data['memory'] = vm['TEMPLATE/MEMORY'] data['image_name'] = vm['TEMPLATE/DISK[1]/VMCATCHER_EVENT_AD_MPURI'] data['image_name'] ||= image_map[vm['TEMPLATE/DISK[1]/IMAGE_ID']] data['image_name'] ||= mixin(vm) data['image_name'] ||= vm['TEMPLATE/DISK[1]/IMAGE_ID'] data['history'] = history_records(vm) data['disks'] = disk_records(vm) data['number_of_public_ips'] = number_of_public_ips(vm) benchmark = benchmark_map[vm['HISTORY_RECORDS/HISTORY[last()]/HID']] data['benchmark_type'] = benchmark[:benchmark_type] data['benchmark_value'] = benchmark[:benchmark_value] site_name = cluster_map[vm['HISTORY_RECORDS/HISTORY[1]/CID']] data['site_name'] = site_name if site_name data end
Write processed data into output directory
@param [Hash] data data to be written into file @param [Fixnum] file_number sequence number of file data will be written to
# File lib/one_worker.rb, line 265 def write_data(data, file_number) logger.debug('Creating writer...') ow = OneWriter.new(data, file_number, logger) ow.write rescue => e msg = "Cannot write result: #{e.message}" logger.error(msg) raise msg end
Private Instance Methods
Check if IP is public
@param [String] ip address
@return [Bool] true or false
# File lib/one_worker.rb, line 282 def ip_public?(ip) ip_obj = IPAddr.new(ip.text) IGNORED_NETWORKS.each do |net| return false if net.include? ip_obj end true end