class OmfEc::Experiment
Experiment
class to hold relevant state information
Attributes
app_definitions[RW]
assertion[RW]
cmdline_properties[RW]
groups[R]
job_mps[RW]
job_url[RW]
js_url[RW]
name[RW]
nodes[RW]
oml_uri[RW]
property[RW]
show_graph[RW]
sliceID[RW]
ss_url[RW]
sub_groups[R]
Public Class Methods
ID()
click to toggle source
Unique experiment id (Class method)
# File lib/omf_ec/experiment.rb, line 181 def self.ID instance.id end
disconnect()
click to toggle source
# File lib/omf_ec/experiment.rb, line 312 def disconnect info "Disconnecting in 5 sec from experiment: #{OmfEc.experiment.id}" info "Run the EC again to reattach" OmfCommon.el.after(5) do OmfCommon.comm.disconnect OmfCommon.eventloop.stop end end
done()
click to toggle source
Disconnect communicator, try to delete any XMPP affiliations
# File lib/omf_ec/experiment.rb, line 272 def done info "Experiment: #{OmfEc.experiment.id} finished" info "Exit in 15 seconds..." # Make sure that all defined events are removed OmfEc.experiment.clear_events OmfCommon.el.after(10) do allGroups do |g| # Clean up unless g.app_contexts.empty? info "Release applications in #{g.name}" g.resources[type: 'application'].release end unless g.net_ifs.find_all { |v| v.conf[:type] == 'net' }.empty? info "Release wired network interfaces in #{g.name}" g.resources[type: 'net'].release end unless g.net_ifs.find_all { |v| v.conf[:type] == 'wlan' }.empty? info "Release wireless network interfaces in #{g.name}" g.resources[type: 'wlan'].release end # Let release messages go through first OmfCommon.el.after(1) do info "Configure resources to leave #{g.name}" g.resources.membership = { leave: g.address } end end OmfCommon.el.after(4) do info "OMF Experiment Controller #{OmfEc::VERSION} - Exit." OmfCommon.el.after(1) do OmfCommon.comm.disconnect OmfCommon.eventloop.stop end end end OmfEc.experiment.log_metadata("state", "finished") end
leave_memberships()
click to toggle source
Ask the resources which joined the groups I created to leave
# File lib/omf_ec/experiment.rb, line 355 def leave_memberships all_groups do |g| g.resources.membership = { leave: g.address } end end
new()
click to toggle source
Calls superclass method
# File lib/omf_ec/experiment.rb, line 32 def initialize super @id = Time.now.utc.iso8601(3) @sliceID = nil @state ||= Hashie::Mash.new #TODO: we need to keep history of all the events and not ovewrite them @groups ||= [] @nodes ||= [] @events ||= [] @app_definitions ||= Hash.new @sub_groups ||= [] @cmdline_properties ||= Hash.new @show_graph = false @js_url = nil @job_url = nil @job_mps = {} @ss_url = nil end
sliceID()
click to toggle source
Unique slice id (Class method)
# File lib/omf_ec/experiment.rb, line 186 def self.sliceID instance.sliceID end
start()
click to toggle source
# File lib/omf_ec/experiment.rb, line 321 def start info "Experiment: #{OmfEc.experiment.id} starts" info "Slice: #{OmfEc.experiment.sliceID}" unless OmfEc.experiment.sliceID.nil? OmfEc.experiment.log_metadata("state", "running") allGroups do |g| info "CONFIGURE #{g.members.size} resources to join group #{g.name}" debug "CONFIGURE #{g.members.keys} to join group #{g.name}" g.members.each do |key, value| OmfEc.subscribe_and_monitor(key) do |res| #info "Configure '#{key}' to join '#{g.name}'" g.synchronize do g.members[key] = res.address end res.configure({ membership: g.address, res_index: OmfEc.experiment.nodes.index(key) }, { assert: OmfEc.experiment.assertion }) end end end # For every 100 nodes, increase check interval by 1 second count = allGroups.inject(0) { |c, g| c += g.members.size } interval = count / 100 interval = 1 if interval < 1 info "TOTAL resources: #{count}. Events check interval: #{interval}." OmfCommon.el.every(interval) do EM.next_tick do OmfEc.experiment.process_events rescue nil end end end
Public Instance Methods
add_event(name, opts, trigger)
click to toggle source
# File lib/omf_ec/experiment.rb, line 157 def add_event(name, opts, trigger) self.synchronize do warn "Event '#{name}' has already been defined. Overwriting it now." if event(name) @events.delete_if { |e| e[:name] == name } @events << { name: name, trigger: trigger, aliases: [] }.merge(opts) add_periodic_event(event(name)) if opts[:every] end end
add_group(group)
click to toggle source
# File lib/omf_ec/experiment.rb, line 134 def add_group(group) self.synchronize do raise ArgumentError, "Expect Group object, got #{group.inspect}" unless group.kind_of? OmfEc::Group @groups << group unless group(group.name) end end
add_or_update_resource_state(name, opts = {})
click to toggle source
# File lib/omf_ec/experiment.rb, line 74 def add_or_update_resource_state(name, opts = {}) self.synchronize do res = resource_state(name) if res opts.each do |key, value| if value.class == Array # Merge array values res[key] ||= [] res[key] += value res[key].uniq! elsif value.kind_of? Hash # Merge hash values res[key] ||= {} res[key].merge!(value) else # Overwrite otherwise res[key] = value end end else debug "Newly discovered resource >> #{name}" #res = Hashie::Mash.new({ address: name }).merge(opts) opts[:address] = name @state[name] = opts # Re send membership configure #planned_groups = groups_by_res(name) #unless planned_groups.empty? # OmfEc.subscribe_and_monitor(name) do |res| # info "Config #{name} to join #{planned_groups.map(&:name).join(', ')}" # res.configure({ membership: planned_groups.map(&:address) }, { assert: OmfEc.experiment.assertion } ) # end #end end end end
Also aliased as: add_resource
add_periodic_event(event)
click to toggle source
# File lib/omf_ec/experiment.rb, line 199 def add_periodic_event(event) event[:periodic_timer] = OmfCommon.el.every(event[:every]) do self.synchronize do eval_trigger(event) end end end
add_property(name, value = nil, description = nil)
click to toggle source
# File lib/omf_ec/experiment.rb, line 58 def add_property(name, value = nil, description = nil) override_value = @cmdline_properties[name.to_s.to_sym] value = override_value unless override_value.nil? ExperimentProperty.create(name, value, description) end
add_sub_group(name)
click to toggle source
# File lib/omf_ec/experiment.rb, line 124 def add_sub_group(name) self.synchronize do @sub_groups << name unless @sub_groups.include?(name) end end
all_groups?(&block)
click to toggle source
# File lib/omf_ec/experiment.rb, line 149 def all_groups?(&block) !groups.empty? && groups.all? { |g| block ? block.call(g) : g } end
archive_oedl(script_name)
click to toggle source
Archive OEDL content to OML db
# File lib/omf_ec/experiment.rb, line 237 def archive_oedl(script_name) log_metadata( script_name, Base64.encode64(Zlib::Deflate.deflate(File.read(script_name))), "oedl_content" ) end
clear_events()
click to toggle source
# File lib/omf_ec/experiment.rb, line 166 def clear_events self.synchronize do @events.each do |e| e[:periodic_timer].cancel if e[:periodic_timer] end @events = [] end end
create_job()
click to toggle source
If EC is launched with –job-service setup, then it needs to create a job entry for this experiment trial Do nothing if:
-
a JobService URL has not been provided, i.e. EC runs without needs to contact JS
-
we already have a Job URL, i.e. the job entry has already been created
# File lib/omf_ec/experiment.rb, line 250 def create_job return unless @job_url.nil? return if @js_url.nil? require 'json' require 'net/http' begin job = { name: self.id } u = URI.parse(@js_url+'/jobs') req = Net::HTTP::Post.new(u.path, {'Content-Type' =>'application/json'}) req.body = JSON.pretty_generate(job) res = Net::HTTP.new(u.host, u.port).start {|http| http.request(req) } raise "Could not create a job for this experiment trial\n"+ "Response #{res.code} #{res.message}:\n#{res.body}" unless res.kind_of? Net::HTTPSuccess job = JSON.parse(res.body) raise "No valid URL received for the created job for this experiment trial" if job['href'].nil? @job_url = job['href'] end end
each_group(&block)
click to toggle source
# File lib/omf_ec/experiment.rb, line 141 def each_group(&block) if block groups.each { |g| block.call(g) } else groups end end
eval_trigger(event)
click to toggle source
# File lib/omf_ec/experiment.rb, line 207 def eval_trigger(event) if event[:callbacks] && !event[:callbacks].empty? && event[:trigger].call(state) # Periodic check event event[:periodic_timer].cancel if event[:periodic_timer] && event[:consume_event] @events.delete(event) if event[:consume_event] event_names = ([event[:name]] + event[:aliases]).join(', ') info "Event triggered: '#{event_names}'" # Last in first serve callbacks event[:callbacks].reverse.each do |callback| callback.call end end end
event(name)
click to toggle source
# File lib/omf_ec/experiment.rb, line 153 def event(name) @events.find { |v| v[:name] == name || v[:aliases].include?(name) } end
group(name)
click to toggle source
# File lib/omf_ec/experiment.rb, line 130 def group(name) groups.find { |v| v.name == name } end
groups_by_res(res_addr)
click to toggle source
Find all groups a given resource belongs to
# File lib/omf_ec/experiment.rb, line 116 def groups_by_res(res_addr) groups.find_all { |g| g.members.values.include?(res_addr) } end
id()
click to toggle source
Unique experiment id
# File lib/omf_ec/experiment.rb, line 176 def id @name || @id end
log_metadata(key, value, domain = 'sys')
click to toggle source
# File lib/omf_ec/experiment.rb, line 231 def log_metadata(key, value, domain = 'sys') #MetaData.inject_metadata(key.to_s, value.to_s) MetaData.inject(domain.to_s, key.to_s, value.to_s) end
mp_table_names()
click to toggle source
# File lib/omf_ec/experiment.rb, line 223 def mp_table_names {}.tap do |m_t_n| groups.map(&:app_contexts).flatten.map(&:mp_table_names).each do |v| m_t_n.merge!(v) end end end
process_events()
click to toggle source
Parsing user defined events, checking conditions against internal state, and execute callbacks if triggered
# File lib/omf_ec/experiment.rb, line 191 def process_events self.synchronize do @events.find_all { |v| v[:every].nil? }.each do |event| eval_trigger(event) end end end
resource_by_hrn(hrn)
click to toggle source
# File lib/omf_ec/experiment.rb, line 70 def resource_by_hrn(hrn) @state[hrn] end
resource_state(address)
click to toggle source
# File lib/omf_ec/experiment.rb, line 64 def resource_state(address) @state[address] end
Also aliased as: resource
state()
click to toggle source
# File lib/omf_ec/experiment.rb, line 50 def state @state.values end
sub_group(name)
click to toggle source
# File lib/omf_ec/experiment.rb, line 120 def sub_group(name) @sub_groups.find { |v| v == name } end