class OneWorker

Sidekiq worker class

Constants

IGNORED_NETWORKS

Public Instance Methods

common_data() click to toggle source

Prepare data that are common for all the output types

# File lib/one_worker.rb, line 30
def common_data
  data = {}
  data['oneacct_export_version'] = ::OneacctExporter::VERSION

  data
end
create_cluster_map(oda) click to toggle source
# File lib/one_worker.rb, line 78
def create_cluster_map(oda)
  logger.debug('Creating cluster map.')
  create_map(OpenNebula::ClusterPool, 'TEMPLATE/APEL_SITE_NAME', oda)
end
create_image_map(oda) click to toggle source

Create mapping of image ID and specified element

@return [Hash] created map

# File lib/one_worker.rb, line 73
def create_image_map(oda)
  logger.debug('Creating image map.')
  create_map(OpenNebula::ImagePool, 'TEMPLATE/VMCATCHER_EVENT_AD_MPURI', oda)
end
create_map(pool_type, mapping, oda) click to toggle source

Generic method for mapping creation

# File lib/one_worker.rb, line 84
def create_map(pool_type, mapping, oda)
  oda.mapping(pool_type, mapping)
rescue => e
  msg = "Couldn't create map: #{e.message}. "\
    'Stopping to avoid malformed records.'
  logger.error(msg)
  raise msg
end
create_user_map(oda) click to toggle source

Create mapping of user ID and specified element

@return [Hash] created map

# File lib/one_worker.rb, line 65
def create_user_map(oda)
  logger.debug('Creating user map.')
  create_map(OpenNebula::UserPool, 'TEMPLATE/X509_DN', oda)
end
disk_records(vm) click to toggle source

Returns an array of disk records from vm

@param [OpenNebula::VirtualMachine] vm virtual machine

@return [Array] array of hashes representing vm's disk records

# File lib/one_worker.rb, line 171
def disk_records(vm)
  disks = []
  vm.each 'TEMPLATE/DISK' do |d|
    disk = {}
    disk['size'] = d['SIZE']

    disks << disk
  end

  disks
end
history_records(vm) click to toggle source

Returns an array of history records from vm

@param [OpenNebula::VirtualMachine] vm virtual machine

@return [Array] array of hashes representing vm's history records

# File lib/one_worker.rb, line 149
def history_records(vm)
  history = []
  vm.each 'HISTORY_RECORDS/HISTORY' do |h|
    history_record = {}
    history_record['start_time'] = h['STIME']
    history_record['end_time'] = h['ETIME']
    history_record['rstart_time'] = h['RSTIME']
    history_record['rend_time'] = h['RETIME']
    history_record['seq'] = h['SEQ']
    history_record['hostname'] = h['HOSTNAME']

    history << history_record
  end

  history
end
load_vm(vm_id, oda) click to toggle source

Load virtual machine with specified ID

@return [OpenNebula::VirtualMachine] virtual machine

# File lib/one_worker.rb, line 96
def load_vm(vm_id, oda)
  oda.vm(vm_id)
rescue => e
  logger.error("Couldn't retrieve data for vm with id: #{vm_id}. #{e.message}. Skipping.")
  return nil
end
mixin(vm) click to toggle source

Look for 'os_tpl' OCCI mixin to better identify virtual machine's image

@param [OpenNebula::VirtualMachine] vm virtual machine

@return [NilClass, String] if found, mixin identifying string, nil otherwise

# File lib/one_worker.rb, line 204
def mixin(vm)
  mixin_locations = %w(USER_TEMPLATE/OCCI_COMPUTE_MIXINS USER_TEMPLATE/OCCI_MIXIN TEMPLATE/OCCI_MIXIN)

  mixin_locations.each do |mixin_location|
    vm.each mixin_location do |mixin|
      mixin = mixin.text.split
      mixin.select! { |line| line.include? '/occi/infrastructure/os_tpl#' }
      return mixin.first unless mixin.empty?
    end
  end

  nil # nothing found
end
number_of_public_ips(vm) click to toggle source

Returns number of unique public ip addresses of vm

@param [OpenNebula::VirtualMachine] vm virtual machine

@return [Integer] number of unique public ip addresses represented by integer

# File lib/one_worker.rb, line 188
def number_of_public_ips(vm)
  all_ips = []
  vm.each 'TEMPLATE/NIC' do |nic|
    nic.each 'IP' do |ip|
      all_ips << ip.text if ip_public?(ip)
    end
  end

  all_ips.uniq.length
end
output_type_specific_data() click to toggle source

Prepare data that are specific for output type and common for every virtual machine

# File lib/one_worker.rb, line 38
def output_type_specific_data
  data = {}
  if PBS_OT.include?(Settings.output['output_type']) && Settings.output['pbs']
    data['realm'] = Settings.output.pbs['realm']
    data['pbs_queue'] = Settings.output.pbs['queue']
    data['scratch_type'] = Settings.output.pbs['scratch_type']
    data['host'] = Settings.output.pbs['host_identifier']
  end

  if APEL_OT.include?(Settings.output['output_type'])
    data['endpoint'] = Settings.output.apel['endpoint'].chomp('/')
    data['site_name'] = Settings.output.apel['site_name']
    data['cloud_type'] = Settings.output.apel['cloud_type']
    data['cloud_compute_service'] = Settings.output.apel['cloud_compute_service']
  end

  if LOGSTASH_OT.include?(Settings.output['output_type'])
    data['host'] = Settings.output.logstash['host']
    data['port'] = Settings.output.logstash['port']
  end

  data
end
perform(vms, file_number) click to toggle source

Sidekiq specific method, specifies the purpose of the worker

@param [String] vms IDs of virtual machines to process in form of numbers separated by '|' (easier for cooperation with redis) @param [String] file_number number of the output file

# File lib/one_worker.rb, line 223
def perform(vms, file_number)
  OneacctExporter::Log.setup_log_level(logger)

  vms = vms.split('|')

  oda = OneDataAccessor.new(false, logger)
  user_map = create_user_map(oda)
  image_map = create_image_map(oda)
  cluster_map = create_cluster_map(oda)
  benchmark_map = oda.benchmark_map

  data = []

  vms.each do |vm_id|
    vm = load_vm(vm_id, oda)
    next unless vm

    begin
      logger.debug("Processing vm with id: #{vm_id}.")
      vm_data = process_vm(vm, user_map, image_map, cluster_map, benchmark_map)

      validator = DataValidators::ApelDataValidator.new(logger) if APEL_OT.include?(Settings.output['output_type'])
      validator = DataValidators::PbsDataValidator.new(logger) if PBS_OT.include?(Settings.output['output_type'])
      validator = DataValidators::LogstashDataValidator.new(logger) if LOGSTASH_OT.include?(Settings.output['output_type'])

      vm_data = validator.validate_data(vm_data) if validator
    rescue Errors::ValidationError => e
      logger.error("Error occured during processing of vm with id: #{vm_id}. #{e.message}")
      next
    end

    logger.debug("Adding vm with data: #{vm_data} for export.")
    data << vm_data
  end

  write_data(data, file_number) unless data.empty?
end
process_vm(vm, user_map, image_map, cluster_map, benchmark_map) click to toggle source

Obtain and parse required data from vm

@return [Hash] required data from virtual machine

# File lib/one_worker.rb, line 106
def process_vm(vm, user_map, image_map, cluster_map, benchmark_map)
  data = common_data
  data.merge! output_type_specific_data

  data['vm_uuid'] = vm['ID']
  data['start_time'] = vm['STIME']
  data['end_time'] = vm['ETIME']
  data['machine_name'] = vm['DEPLOY_ID']
  data['user_id'] = vm['UID']
  data['group_id'] = vm['GID']
  data['user_dn'] = vm['USER_TEMPLATE/USER_X509_DN']
  data['user_dn'] ||= user_map[data['user_id']]
  data['user_name'] = vm['UNAME']
  data['group_name'] = vm['GNAME']
  data['status_code'] = vm['STATE']
  data['status'] = vm.state_str
  data['cpu_count'] = vm['TEMPLATE/VCPU']
  data['network_inbound'] = vm['MONITORING/NETTX']
  data['network_outbound'] = vm['MONITORING/NETRX']
  data['memory'] = vm['TEMPLATE/MEMORY']
  data['image_name'] = vm['TEMPLATE/DISK[1]/VMCATCHER_EVENT_AD_MPURI']
  data['image_name'] ||= image_map[vm['TEMPLATE/DISK[1]/IMAGE_ID']]
  data['image_name'] ||= mixin(vm)
  data['image_name'] ||= vm['TEMPLATE/DISK[1]/IMAGE_ID']
  data['history'] = history_records(vm)
  data['disks'] = disk_records(vm)
  data['number_of_public_ips'] = number_of_public_ips(vm)

  benchmark = benchmark_map[vm['HISTORY_RECORDS/HISTORY[last()]/HID']]
  data['benchmark_type'] = benchmark[:benchmark_type]
  data['benchmark_value'] = benchmark[:benchmark_value]

  site_name = cluster_map[vm['HISTORY_RECORDS/HISTORY[1]/CID']]
  data['site_name'] = site_name if site_name

  data
end
write_data(data, file_number) click to toggle source

Write processed data into output directory

@param [Hash] data data to be written into file @param [Fixnum] file_number sequence number of file data will be written to

# File lib/one_worker.rb, line 265
def write_data(data, file_number)
  logger.debug('Creating writer...')
  ow = OneWriter.new(data, file_number, logger)
  ow.write
rescue => e
  msg = "Cannot write result: #{e.message}"
  logger.error(msg)
  raise msg
end

Private Instance Methods

ip_public?(ip) click to toggle source

Check if IP is public

@param [String] ip address

@return [Bool] true or false

# File lib/one_worker.rb, line 282
def ip_public?(ip)
  ip_obj = IPAddr.new(ip.text)
  IGNORED_NETWORKS.each do |net|
    return false if net.include? ip_obj
  end
  true
end