class OmfEc::Experiment

Experiment class to hold relevant state information

Attributes

app_definitions[RW]
assertion[RW]
cmdline_properties[RW]
groups[R]
job_mps[RW]
job_url[RW]
js_url[RW]
name[RW]
nodes[RW]
oml_uri[RW]
property[RW]
show_graph[RW]
sliceID[RW]
ss_url[RW]
sub_groups[R]

Public Class Methods

ID() click to toggle source

Unique experiment id (Class method)

# File lib/omf_ec/experiment.rb, line 181
def self.ID
  instance.id
end
disconnect() click to toggle source
# File lib/omf_ec/experiment.rb, line 312
def disconnect
  info "Disconnecting in 5 sec from experiment: #{OmfEc.experiment.id}"
  info "Run the EC again to reattach"
  OmfCommon.el.after(5) do
    OmfCommon.comm.disconnect
    OmfCommon.eventloop.stop
  end
end
done() click to toggle source

Disconnect communicator, try to delete any XMPP affiliations

# File lib/omf_ec/experiment.rb, line 272
def done
  info "Experiment: #{OmfEc.experiment.id} finished"
  info "Exit in 15 seconds..."

  # Make sure that all defined events are removed
  OmfEc.experiment.clear_events

  OmfCommon.el.after(10) do
    allGroups do |g|
      # Clean up
      unless g.app_contexts.empty?
        info "Release applications in #{g.name}"
        g.resources[type: 'application'].release
      end
      unless g.net_ifs.find_all { |v| v.conf[:type] == 'net' }.empty?
        info "Release wired network interfaces in #{g.name}"
        g.resources[type: 'net'].release
      end
      unless g.net_ifs.find_all { |v| v.conf[:type] == 'wlan' }.empty?
        info "Release wireless network interfaces in #{g.name}"
        g.resources[type: 'wlan'].release
      end
      # Let release messages go through first
      OmfCommon.el.after(1) do
        info "Configure resources to leave #{g.name}"
        g.resources.membership = { leave: g.address }
      end
    end

    OmfCommon.el.after(4) do
      info "OMF Experiment Controller #{OmfEc::VERSION} - Exit."
      OmfCommon.el.after(1) do
        OmfCommon.comm.disconnect
        OmfCommon.eventloop.stop
      end
    end
  end
  OmfEc.experiment.log_metadata("state", "finished")
end
leave_memberships() click to toggle source

Ask the resources which joined the groups I created to leave

# File lib/omf_ec/experiment.rb, line 355
def leave_memberships
  all_groups do |g|
    g.resources.membership = { leave: g.address }
  end
end
new() click to toggle source
Calls superclass method
# File lib/omf_ec/experiment.rb, line 32
def initialize
  super
  @id = Time.now.utc.iso8601(3)
  @sliceID = nil
  @state ||= Hashie::Mash.new #TODO: we need to keep history of all the events and not ovewrite them
  @groups ||= []
  @nodes ||= []
  @events ||= []
  @app_definitions ||= Hash.new
  @sub_groups ||= []
  @cmdline_properties ||= Hash.new
  @show_graph = false
  @js_url = nil
  @job_url = nil
  @job_mps = {}
  @ss_url = nil
end
sliceID() click to toggle source

Unique slice id (Class method)

# File lib/omf_ec/experiment.rb, line 186
def self.sliceID
  instance.sliceID
end
start() click to toggle source
# File lib/omf_ec/experiment.rb, line 321
def start
  info "Experiment: #{OmfEc.experiment.id} starts"
  info "Slice: #{OmfEc.experiment.sliceID}" unless OmfEc.experiment.sliceID.nil?
  OmfEc.experiment.log_metadata("state", "running")

  allGroups do |g|
    info "CONFIGURE #{g.members.size} resources to join group #{g.name}"
    debug "CONFIGURE #{g.members.keys} to join group #{g.name}"
    g.members.each do |key, value|
      OmfEc.subscribe_and_monitor(key) do |res|
        #info "Configure '#{key}' to join '#{g.name}'"
        g.synchronize do
          g.members[key] = res.address
        end
        res.configure({ membership: g.address, res_index: OmfEc.experiment.nodes.index(key) }, { assert: OmfEc.experiment.assertion })
      end
    end
  end

  # For every 100 nodes, increase check interval by 1 second
  count = allGroups.inject(0) { |c, g| c += g.members.size }
  interval = count / 100
  interval = 1 if interval < 1
  info "TOTAL resources: #{count}. Events check interval: #{interval}."

  OmfCommon.el.every(interval) do
    EM.next_tick do
      OmfEc.experiment.process_events rescue nil
    end
  end
end

Public Instance Methods

add_event(name, opts, trigger) click to toggle source
# File lib/omf_ec/experiment.rb, line 157
def add_event(name, opts, trigger)
  self.synchronize do
    warn "Event '#{name}' has already been defined. Overwriting it now." if event(name)
    @events.delete_if { |e| e[:name] == name }
    @events << { name: name, trigger: trigger, aliases: [] }.merge(opts)
    add_periodic_event(event(name)) if opts[:every]
  end
end
add_group(group) click to toggle source
# File lib/omf_ec/experiment.rb, line 134
def add_group(group)
  self.synchronize do
    raise ArgumentError, "Expect Group object, got #{group.inspect}" unless group.kind_of? OmfEc::Group
    @groups << group unless group(group.name)
  end
end
add_or_update_resource_state(name, opts = {}) click to toggle source
# File lib/omf_ec/experiment.rb, line 74
def add_or_update_resource_state(name, opts = {})
  self.synchronize do
    res = resource_state(name)
    if res
      opts.each do |key, value|
        if value.class == Array
          # Merge array values
          res[key] ||= []
          res[key] += value
          res[key].uniq!
        elsif value.kind_of? Hash
          # Merge hash values
          res[key] ||= {}
          res[key].merge!(value)
        else
          # Overwrite otherwise
          res[key] = value
        end
      end
    else
      debug "Newly discovered resource >> #{name}"
      #res = Hashie::Mash.new({ address: name }).merge(opts)
      opts[:address] = name
      @state[name] = opts

      # Re send membership configure
      #planned_groups = groups_by_res(name)

      #unless planned_groups.empty?
      #  OmfEc.subscribe_and_monitor(name) do |res|
      #    info "Config #{name} to join #{planned_groups.map(&:name).join(', ')}"
      #    res.configure({ membership: planned_groups.map(&:address) }, { assert: OmfEc.experiment.assertion } )
      #  end
      #end
    end
  end
end
Also aliased as: add_resource
add_periodic_event(event) click to toggle source
# File lib/omf_ec/experiment.rb, line 199
def add_periodic_event(event)
  event[:periodic_timer] = OmfCommon.el.every(event[:every]) do
    self.synchronize do
      eval_trigger(event)
    end
  end
end
add_property(name, value = nil, description = nil) click to toggle source
# File lib/omf_ec/experiment.rb, line 58
def add_property(name, value = nil, description = nil)
  override_value = @cmdline_properties[name.to_s.to_sym]
  value = override_value unless override_value.nil?
  ExperimentProperty.create(name, value, description)
end
add_resource(name, opts = {})
add_sub_group(name) click to toggle source
# File lib/omf_ec/experiment.rb, line 124
def add_sub_group(name)
  self.synchronize do
    @sub_groups << name unless @sub_groups.include?(name)
  end
end
all_groups?(&block) click to toggle source
# File lib/omf_ec/experiment.rb, line 149
def all_groups?(&block)
  !groups.empty? && groups.all? { |g| block ? block.call(g) : g }
end
archive_oedl(script_name) click to toggle source

Archive OEDL content to OML db

# File lib/omf_ec/experiment.rb, line 237
def archive_oedl(script_name)
  log_metadata(
    script_name,
    Base64.encode64(Zlib::Deflate.deflate(File.read(script_name))),
    "oedl_content"
  )
end
clear_events() click to toggle source
# File lib/omf_ec/experiment.rb, line 166
def clear_events
  self.synchronize do
    @events.each do |e|
      e[:periodic_timer].cancel if e[:periodic_timer]
    end
    @events = []
  end
end
create_job() click to toggle source

If EC is launched with –job-service setup, then it needs to create a job entry for this experiment trial Do nothing if:

  • a JobService URL has not been provided, i.e. EC runs without needs to contact JS

  • we already have a Job URL, i.e. the job entry has already been created

# File lib/omf_ec/experiment.rb, line 250
def create_job
  return unless @job_url.nil?
  return if @js_url.nil?
  require 'json'
  require 'net/http'
  begin
    job = { name: self.id }
    u = URI.parse(@js_url+'/jobs')
    req = Net::HTTP::Post.new(u.path, {'Content-Type' =>'application/json'})
    req.body = JSON.pretty_generate(job)
    res = Net::HTTP.new(u.host, u.port).start {|http| http.request(req) }
    raise "Could not create a job for this experiment trial\n"+
          "Response #{res.code} #{res.message}:\n#{res.body}" unless res.kind_of? Net::HTTPSuccess
    job = JSON.parse(res.body)
    raise "No valid URL received for the created job for this experiment trial" if job['href'].nil?
    @job_url = job['href']
  end
end
each_group(&block) click to toggle source
# File lib/omf_ec/experiment.rb, line 141
def each_group(&block)
  if block
    groups.each { |g| block.call(g) }
  else
    groups
  end
end
eval_trigger(event) click to toggle source
# File lib/omf_ec/experiment.rb, line 207
def eval_trigger(event)
  if event[:callbacks] && !event[:callbacks].empty? && event[:trigger].call(state)
    # Periodic check event
    event[:periodic_timer].cancel if event[:periodic_timer] && event[:consume_event]

    @events.delete(event) if event[:consume_event]
    event_names = ([event[:name]] + event[:aliases]).join(', ')
    info "Event triggered: '#{event_names}'"

    # Last in first serve callbacks
    event[:callbacks].reverse.each do |callback|
      callback.call
    end
  end
end
event(name) click to toggle source
# File lib/omf_ec/experiment.rb, line 153
def event(name)
  @events.find { |v| v[:name] == name || v[:aliases].include?(name) }
end
group(name) click to toggle source
# File lib/omf_ec/experiment.rb, line 130
def group(name)
  groups.find { |v| v.name == name }
end
groups_by_res(res_addr) click to toggle source

Find all groups a given resource belongs to

# File lib/omf_ec/experiment.rb, line 116
def groups_by_res(res_addr)
  groups.find_all { |g| g.members.values.include?(res_addr) }
end
id() click to toggle source

Unique experiment id

# File lib/omf_ec/experiment.rb, line 176
def id
  @name || @id
end
log_metadata(key, value, domain = 'sys') click to toggle source
# File lib/omf_ec/experiment.rb, line 231
def log_metadata(key, value, domain = 'sys')
  #MetaData.inject_metadata(key.to_s, value.to_s)
  MetaData.inject(domain.to_s, key.to_s, value.to_s)
end
mp_table_names() click to toggle source
# File lib/omf_ec/experiment.rb, line 223
def mp_table_names
  {}.tap do |m_t_n|
    groups.map(&:app_contexts).flatten.map(&:mp_table_names).each do |v|
      m_t_n.merge!(v)
    end
  end
end
process_events() click to toggle source

Parsing user defined events, checking conditions against internal state, and execute callbacks if triggered

# File lib/omf_ec/experiment.rb, line 191
def process_events
  self.synchronize do
    @events.find_all { |v| v[:every].nil? }.each do |event|
      eval_trigger(event)
    end
  end
end
resource(address)
Alias for: resource_state
resource_by_hrn(hrn) click to toggle source
# File lib/omf_ec/experiment.rb, line 70
def resource_by_hrn(hrn)
  @state[hrn]
end
resource_state(address) click to toggle source
# File lib/omf_ec/experiment.rb, line 64
def resource_state(address)
  @state[address]
end
Also aliased as: resource
state() click to toggle source
# File lib/omf_ec/experiment.rb, line 50
def state
  @state.values
end
sub_group(name) click to toggle source
# File lib/omf_ec/experiment.rb, line 120
def sub_group(name)
  @sub_groups.find { |v| v == name }
end