module Sfp::Agent
Constants
- AgentsDataFile
- BSigFile
- BSigPIDFile
- BSigThreadsLockFile
- CacheModelFile
- DefaultPort
- Home
- LogFile
- ModelFile
- NetHelper
- PIDFile
- ParentEliminator
Public Class Methods
bsig_engine()
click to toggle source
# File lib/sfpagent/agent.rb, line 328 def self.bsig_engine @@bsig_engine end
config()
click to toggle source
# File lib/sfpagent/agent.rb, line 55 def self.config @@config end
delete_agents()
click to toggle source
# File lib/sfpagent/agent.rb, line 584 def self.delete_agents File.open(AgentsDataFile, File::RDWR|File::CREAT, 0644) { |f| f.flock(File::LOCK_EX) f.rewind f.write('{}') f.flush f.truncate(f.pos) } end
execute_action(action)
click to toggle source
Execute an action
@param action contains the action's schema.
# File lib/sfpagent/agent.rb, line 410 def self.execute_action(action) begin logger = (@@config[:daemon] ? Sfp::Agent.logger : Logger.new(STDOUT)) action_string = "#{action['name']} #{JSON.generate(action['parameters'])}" logger.info "Executing #{action_string} [Wait]" result = @@runtime.execute_action(action) logger.info "Executing #{action_string} " + (result ? "[OK]" : "[Failed]") return result rescue Exception => e logger.error "Executing #{action_string} [Failed] #{e}\n#{e.backtrace.join("\n")}" end false end
get_agents()
click to toggle source
# File lib/sfpagent/agent.rb, line 677 def self.get_agents return {} if not File.exist?(AgentsDataFile) modified_time = File.mtime(AgentsDataFile) return @@agents_database if modified_time == @@agents_database_modified_time and (Time.new - modified_time) < 60 @@agents_database_modified_time = File.mtime(AgentsDataFile) @@agents_database = JSON[File.read(AgentsDataFile)] end
get_bsig()
click to toggle source
Return a BSig
model from cached file
# File lib/sfpagent/agent.rb, line 313 def self.get_bsig return nil if not File.exist?(BSigFile) return @@bsig if File.mtime(BSigFile) == @@bsig_modified_time begin data = File.read(BSigFile) @@bsig = (data.length > 0 ? JSON[data] : nil) @@bsig_modified_time = File.mtime(BSigFile) return @@bsig rescue Exception => e Sfp::Agent.logger.error "Get the BSig model [Failed] #{e}\n#{e.backtrace.join("\n")}" end false end
get_cache_model(name)
click to toggle source
# File lib/sfpagent/agent.rb, line 195 def self.get_cache_model(name) if File.exist?(CacheModelFile) model = JSON[File.read(CacheModelFile)] return model[name] if model.has_key?(name) end nil end
get_log(n=0)
click to toggle source
# File lib/sfpagent/agent.rb, line 575 def self.get_log(n=0) return '' if not File.exist?(LogFile) if n <= 0 File.read(LogFile) else `tail -n #{n} #{LogFile}` end end
get_module_hash(name)
click to toggle source
# File lib/sfpagent/agent.rb, line 482 def self.get_module_hash(name) return nil if @@config[:modules_dir].to_s == '' module_dir = "#{@@config[:modules_dir]}/#{name}" if File.directory? module_dir if `which md5sum`.strip.length > 0 return `find #{module_dir} -type f -exec md5sum {} + | awk '{print $1}' | sort | md5sum | awk '{print $1}'`.strip elsif `which md5`.strip.length > 0 return `find #{module_dir} -type f -exec md5 {} + | awk '{print $4}' | sort | md5`.strip end end nil end
get_modules()
click to toggle source
# File lib/sfpagent/agent.rb, line 496 def self.get_modules #return [] if not (defined? @@modules and @@modules.is_a?(Hash)) #data = {} #@@modules.each_key { |m| data[m] = get_module_hash(m) } #data (defined?(@@modules) ? @@modules : {}) end
get_sfp(module_name)
click to toggle source
# File lib/sfpagent/agent.rb, line 473 def self.get_sfp(module_name) dir = @@config[:modules_dir] filepath = "#{dir}/#{module_name}/#{module_name}.sfp" sfp = parse(filepath).root sfp.accept(Sfp::Visitor::ParentEliminator.new) JSON.generate(sfp) end
get_state(as_sfp=true)
click to toggle source
Return the current state of the model.
# File lib/sfpagent/agent.rb, line 339 def self.get_state(as_sfp=true) @@runtime_lock.synchronize { return nil if !defined?(@@runtime) or @@runtime.nil? begin return @@runtime.get_state(as_sfp) rescue Exception => e Sfp::Agent.logger.error "Get state [Failed] #{e}\n#{e.backtrace.join("\n")}" end } false end
install_module(name, data, reload=true)
click to toggle source
# File lib/sfpagent/agent.rb, line 537 def self.install_module(name, data, reload=true) return false if @@config[:modules_dir].to_s == '' or data.nil? if !File.directory? @@config[:modules_dir] File.delete @@config[:modules_dir] if File.exist? @@config[:modules_dir] Dir.mkdir(@@config[:modules_dir], 0700) end # delete old files module_dir = Shellwords.escape("#{@@config[:modules_dir]}/#{name}") return false if not subpath(module_dir, @@config[:modules_dir]) system("rm -rf #{module_dir}") if File.exist?(module_dir) # save the archive Dir.mkdir("#{module_dir}", 0700) File.open("#{module_dir}/data.tgz", 'wb', 0600) { |f| f.syswrite data } # extract the archive and the files system("cd #{module_dir}; tar xvf data.tgz") Dir.entries(module_dir).each { |name| next if name == '.' or name == '..' name = Shellwords.escape(name) target = "#{module_dir}/#{name}" return false if not subpath(target, @@config[:modules_dir]) if File.directory? target system("cd #{target} && mv * .. && mv .* .. 2>/dev/null ; cd .. && rm -rf #{name}") end datafile = "#{module_dir}/data.tgz" File.delete(datafile) if File.exist?(datafile) } load_modules(@@config) if reload Sfp::Agent.logger.info "Installing module #{name} [OK]" true end
install_modules(modules)
click to toggle source
# File lib/sfpagent/agent.rb, line 529 def self.install_modules(modules) modules.each { |name,data| return false if not install_module(name, data, false) } load_modules(@@config) true end
is_windows()
click to toggle source
# File lib/sfpagent/agent.rb, line 203 def self.is_windows (RbConfig::CONFIG['host_os'] =~ /mswin|mingw|cygwin/) end
load_modules(p={})
click to toggle source
Load all modules in given agent’s module directory.
options: :dir => directory that contains all modules
# File lib/sfpagent/agent.rb, line 432 def self.load_modules(p={}) dir = p[:modules_dir] @@modules = {} counter = 0 if dir != '' and File.directory?(dir) Sfp::Agent.logger.info "Modules directory: #{dir}" Dir.entries(dir).each do |name| module_dir = "#{dir}/#{name}" next if name == '.' or name == '..' or not File.directory?(module_dir) module_file = "#{module_dir}/#{name}.rb" if File.exist?(module_file) begin ### use 'load' than 'require' to rewrite previous definitions load module_file Sfp::Agent.logger.info "Loading module #{module_dir} [OK]" counter += 1 @@modules[name] = { :type => :ruby, :home => module_dir, :hash => get_module_hash(name) } rescue Exception => e Sfp::Agent.logger.warn "Loading module #{dir}/#{name} [Failed]\n#{e}" end elsif File.exist?("#{module_dir}/main") Sfp::Agent.logger.info "Loading module #{module_dir} [OK]" @@modules[name] = { :type => :shell, :home => module_dir, :hash => get_module_hash(name) } counter += 1 else logger.warn "Module #{module_dir} is invalid." end end end Sfp::Agent.logger.info "Successfully loading #{counter} modules." end
logger()
click to toggle source
# File lib/sfpagent/agent.rb, line 51 def self.logger @@logger end
pid()
click to toggle source
# File lib/sfpagent/agent.rb, line 176 def self.pid begin pid = File.read(PIDFile).to_i return pid if Process.kill 0, pid rescue end nil end
resolve(path, as_sfp=true)
click to toggle source
# File lib/sfpagent/agent.rb, line 371 def self.resolve(path, as_sfp=true) return Sfp::Undefined.new if !defined?(@@runtime) or @@runtime.nil? or @@runtime.root.nil? begin path = path.simplify _, node, _ = path.split('.', 3) if @@runtime.root.has_key?(node) # local resolve parent, attribute = path.pop_ref mod = @@runtime.root.at?(parent) if mod.is_a?(Hash) mod[:_self].update_state state = mod[:_self].state return state[attribute] if state.has_key?(attribute) end return Sfp::Undefined.new end agents = get_agents if agents[node].is_a?(Hash) # remote resolve agent = agents[node] path = path[1, path.length-1].gsub /\./, '/' code, data = NetHelper.get_data(agent['sfpAddress'], agent['sfpPort'], "/state#{path}") if code.to_i == 200 state = JSON[data]['state'] return Sfp::Unknown.new if state == '<sfp::unknown>' return state if !state.is_a?(String) or state[0,15] != '<sfp::undefined' end end rescue Exception => e Sfp::Agent.logger.error "Resolve #{path} [Failed] #{e}\n#{e.backtrace.join("\n")}" end Sfp::Undefined.new end
resolve_model(path)
click to toggle source
# File lib/sfpagent/agent.rb, line 351 def self.resolve_model(path) return Sfp::Undefined.new if !defined?(@@runtime) or @@runtime.nil? or @@runtime.root.nil? begin path = path.simplify value = @@runtime.model.at?(path) if value.is_a?(Sfp::Unknown) _, name, rest = path.split('.', 3) model = get_cache_model(name) if !model.nil? and model.has_key?('model') value = (rest.to_s.length <= 0 ? model['model'] : model['model'].at?("$.#{rest}")) value.accept(ParentEliminator) if value.is_a?(Hash) end end return value rescue Exception => e Sfp::Agent.logger.error "Resolve model #{path} [Failed] #{e}\n#{e.backtrace.join("\n")}" end Sfp::Undefined.new end
runtime()
click to toggle source
# File lib/sfpagent/agent.rb, line 59 def self.runtime @@runtime end
set_agents(data)
click to toggle source
parameter:
:data => To delete an agent: { "agent_name" => nil } To add/modify an agent: { "agent_name" => { "sfpAddress" => "10.0.0.1", "sfpPort" => 1314 } }
# File lib/sfpagent/agent.rb, line 598 def self.set_agents(data) data.each { |name,agent| return false if agent.is_a?(Hash) and (not agent['sfpAddress'].is_a?(String) or agent['sfpAddress'].strip == '' or agent['sfpPort'].to_i <= 0) } updated = false agents = nil File.open(AgentsDataFile, File::RDWR|File::CREAT, 0644) { |f| f.flock(File::LOCK_EX) json = f.read agents = (json == '' ? {} : JSON[json]) current_hash = agents.hash #data.each { |k,v| # if !agents.has_key?(k) or v.nil? or agents[k].hash != v.hash # agents[k] = v # end #} agents.merge!(data) agents.keys.each { |k| agents.delete(k) if agents[k].nil? } if current_hash != agents.hash updated = true f.rewind f.write(JSON.generate(agents)) f.flush f.truncate(f.pos) end } if updated @@agents_database = agents Thread.new { # if updated then broadcast to other agents http_data = {'agents' => JSON.generate(data)} agents.each { |name,agent| begin code, _ = NetHelper.put_data(agent['sfpAddress'], agent['sfpPort'], '/agents', http_data, 5, 20) raise Exception if code != '200' rescue #Exception => e #Sfp::Agent.logger.warn "Push agents list to #{agent['sfpAddress']}:#{agent['sfpPort']} [Failed]" end } } # update /etc/hosts so other applications can resolve the names correctly begin File.open('/etc/hosts', File::RDWR, 0644) do |f| f.flock(File::LOCK_EX) output = '' ### remove previous records f.read.each_line do |line| if line =~ /# Edited by Nuri/ break else output << line end end ### add current records output << "# Edited by Nuri (don't change or add anything below this line)\n" agents.each do |name,agent| next if not (agent['sfpAddress'] =~ /^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$/) output << "#{agent['sfpAddress']}\t#{name}\n" end ### save records f.rewind f.write(output) f.flush f.truncate(f.pos) end rescue Exception => e Sfp::Agent.warn 'cannot update /etc/hosts' end end true end
set_bsig(bsig)
click to toggle source
Setting a new BSig
model: set @@bsig variable, and save in cached file
# File lib/sfpagent/agent.rb, line 288 def self.set_bsig(bsig) begin File.open(BSigFile, File::RDWR|File::CREAT, 0600) { |f| f.flock(File::LOCK_EX) Sfp::Agent.logger.info "Setting the BSig model [Wait]" f.rewind data = '' if !bsig.nil? bsig['operators'].each_index { |i| bsig['operators'][i]['id'] = i } data = JSON.generate(bsig) end f.write(data) f.flush f.truncate(f.pos) } Sfp::Agent.logger.info "Setting the BSig model [OK]" return true rescue Exception => e Sfp::Agent.logger.error "Setting the BSig model [Failed] #{e}\n#{e.backtrace.join("\n")}" end false end
set_cache_model(p={})
click to toggle source
# File lib/sfpagent/agent.rb, line 207 def self.set_cache_model(p={}) File.open(CacheModelFile, File::RDWR|File::CREAT, 0600) do |f| f.flock(File::LOCK_EX) json = f.read model = (json.length >= 2 ? JSON[json] : {}) if p[:name] if p[:model] model[p[:name]] = p[:model] Sfp::Agent.logger.info "Saving cache model for #{p[:name]}..." else model.delete(p[:name]) if model.has_key?(p[:name]) Sfp::Agent.logger.info "Deleting cache model for #{p[:name]}..." end else model = {} Sfp::Agent.logger.info "Deleting all cache model..." end f.rewind f.write(JSON.generate(model)) f.flush f.truncate(f.pos) end true end
set_model(model)
click to toggle source
Save given model to cached file, and then reload the model.
# File lib/sfpagent/agent.rb, line 237 def self.set_model(model) begin # generate MD5 hash for the new model data = JSON.generate(model) new_model_hash = Digest::MD5.hexdigest(data) # save the new model if it's not same with the existing one if Digest::MD5.hexdigest(data) != @@current_model_hash Sfp::Agent.logger.info "Setting new model [Wait]" File.open(ModelFile, File::RDWR|File::CREAT, 0600) { |f| f.flock(File::LOCK_EX) f.rewind f.write(data) f.flush f.truncate(f.pos) } update_model Sfp::Agent.logger.info "Setting the model [OK]" end return true rescue Exception => e Sfp::Agent.logger.error "Setting the model [Failed] #{e}\n#{e.backtrace.join("\n")}" end false end
start(opts={})
click to toggle source
Start the agent.
options:
:daemon => true if running as a daemon, false if as a console application :port => port of web server will listen to :ssl => set true to enable HTTPS :certfile => certificate file path for HTTPS :keyfile => key file path for HTTPS
# File lib/sfpagent/agent.rb, line 72 def self.start(opts={}) Sfp::Agent.logger.info "Starting agent..." puts "Starting agent..." @@config = opts Process.daemon if opts[:daemon] and not opts[:mock] begin # check modules directory, and create it if it's not exist opts[:modules_dir] = File.expand_path(opts[:modules_dir].to_s.strip != '' ? opts[:modules_dir].to_s : "#{Home}/modules") Dir.mkdir(opts[:modules_dir], 0700) if not File.exist?(opts[:modules_dir]) # load modules from cached directory load_modules(opts) # reload model update_model({:rebuild => true}) # create web server server_type = WEBrick::SimpleServer port = (opts[:port] ? opts[:port] : DefaultPort) config = { :Host => '0.0.0.0', :Port => port, :ServerType => server_type, :pid => '/tmp/webrick.pid', :Logger => Sfp::Agent.logger } if opts[:ssl] config[:SSLEnable] = true config[:SSLVerifyClient] = OpenSSL::SSL::VERIFY_NONE config[:SSLCertificate] = OpenSSL::X509::Certificate.new(File.open(opts[:certfile]).read) config[:SSLPrivateKey] = OpenSSL::PKey::RSA.new(File.open(opts[:keyfile]).read) config[:SSLCertName] = [["CN", WEBrick::Utils::getservername]] end server = WEBrick::HTTPServer.new(config) server.mount("/", Sfp::Agent::Handler, Sfp::Agent.logger) # create maintenance object maintenance = Maintenance.new(opts) if not is_windows # trap signal ['INT', 'KILL', 'HUP'].each do |signal| trap(signal) { maintenance.stop Sfp::Agent.logger.info "Shutting down web server and BSig engine..." bsig_engine.stop loop do break if bsig_engine.status == :stopped sleep 1 end server.shutdown } end end File.open(PIDFile, 'w', 0644) { |f| f.write($$.to_s) } bsig_engine.start maintenance.start server.start if not opts[:mock] rescue Exception => e Sfp::Agent.logger.error "Starting the agent [Failed] #{e}\n#{e.backtrace.join("\n")}" raise e end end
status()
click to toggle source
Return agent’s PID if it is running, otherwise nil.
# File lib/sfpagent/agent.rb, line 187 def self.status if pid.nil? puts "Agent is not running." else puts "Agent is running with PID #{pid}" end end
stop(opts={})
click to toggle source
Stop the agent’s daemon.
# File lib/sfpagent/agent.rb, line 146 def self.stop(opts={}) begin pid = File.read(PIDFile).to_i puts "Stopping agent with PID #{pid}..." Process.kill 'HUP', pid if not opts[:mock] begin sleep (Sfp::BSig::SleepTime + 0.25) Process.kill 0, pid Sfp::Agent.logger.info "Agent is still running." puts "Agent is still running." Sfp::Agent.logger.info "Killing agent." puts "Killing agent." Process.kill 9, pid rescue Sfp::Agent.logger.info "Agent has stopped." puts "Agent has stopped." File.delete(PIDFile) if File.exist?(PIDFile) end end rescue puts "Agent is not running." File.delete(PIDFile) if File.exist?(PIDFile) end end
subpath(path1, path2)
click to toggle source
return true if path1 is subpath of path2, otherwise false
# File lib/sfpagent/agent.rb, line 687 def self.subpath(path1, path2) path1 = File.expand_path(path1) path2 = File.expand_path(path2) (path1[0,path2.length] == path2) end
uninstall_all_modules(p={})
click to toggle source
# File lib/sfpagent/agent.rb, line 504 def self.uninstall_all_modules(p={}) return true if @@config[:modules_dir] == '' if system("rm -rf #{@@config[:modules_dir]}/*") load_modules(@@config) Sfp::Agent.logger.info "Deleting all modules [OK]" return true end Sfp::Agent.logger.info "Deleting all modules [Failed]" false end
uninstall_module(name)
click to toggle source
# File lib/sfpagent/agent.rb, line 515 def self.uninstall_module(name) return false if @@config[:modules_dir] == '' module_dir = Shellwords.escape(File.expand_path("#{@@config[:modules_dir]}/#{name}")) return false if not subpath(module_dir, @@config[:modules_dir]) if File.directory?(module_dir) result = !!system("rm -rf #{module_dir}") else result = true end load_modules(@@config) Sfp::Agent.logger.info "Deleting module #{name} " + (result ? "[OK]" : "[Failed]") result end
update_model(p={})
click to toggle source
Reload the model from cached file.
# File lib/sfpagent/agent.rb, line 265 def self.update_model(p={}) if not File.exist?(ModelFile) Sfp::Agent.logger.info "There is no model in cache." else begin @@runtime_lock.synchronize { data = File.read(ModelFile) @@current_model_hash = Digest::MD5.hexdigest(data) if !defined?(@@runtime) or @@runtime.nil? or p[:rebuild] @@runtime = Sfp::Runtime.new(JSON[data]) else @@runtime.set_model(JSON[data]) end } Sfp::Agent.logger.info "Reloading the model in cache [OK]" rescue Exception => e Sfp::Agent.logger.error "Reloading the model in cache [Failed] #{e}\n#{e.backtrace.join("\n")}" end end end
whoami?()
click to toggle source
# File lib/sfpagent/agent.rb, line 332 def self.whoami? return nil if !defined?(@@runtime) or @@runtime.nil? @@runtime.whoami? end