module Sfp::Agent

Constants

AgentsDataFile
BSigFile
BSigPIDFile
BSigThreadsLockFile
CacheModelFile
DefaultPort
Home
LogFile
ModelFile
NetHelper
PIDFile
ParentEliminator

Public Class Methods

bsig_engine() click to toggle source
# File lib/sfpagent/agent.rb, line 328
def self.bsig_engine
        @@bsig_engine
end
config() click to toggle source
# File lib/sfpagent/agent.rb, line 55
def self.config
        @@config
end
delete_agents() click to toggle source
# File lib/sfpagent/agent.rb, line 584
def self.delete_agents
        File.open(AgentsDataFile, File::RDWR|File::CREAT, 0644) { |f|
                f.flock(File::LOCK_EX)
                f.rewind
                f.write('{}')
                f.flush
                f.truncate(f.pos)
        }
end
execute_action(action) click to toggle source

 Execute an action

@param action contains the action's schema.
# File lib/sfpagent/agent.rb, line 410
def self.execute_action(action)
        begin
                logger = (@@config[:daemon] ? Sfp::Agent.logger : Logger.new(STDOUT))
                action_string = "#{action['name']} #{JSON.generate(action['parameters'])}"
                logger.info "Executing #{action_string} [Wait]"
                result = @@runtime.execute_action(action)
                logger.info "Executing #{action_string} " + (result ? "[OK]" : "[Failed]")
                return result
        rescue Exception => e
                logger.error "Executing #{action_string} [Failed] #{e}\n#{e.backtrace.join("\n")}"
        end
        false
end
get_agents() click to toggle source
# File lib/sfpagent/agent.rb, line 677
def self.get_agents
        return {} if not File.exist?(AgentsDataFile)
        modified_time = File.mtime(AgentsDataFile)
        return @@agents_database if modified_time == @@agents_database_modified_time and
                                    (Time.new - modified_time) < 60
        @@agents_database_modified_time = File.mtime(AgentsDataFile)
        @@agents_database = JSON[File.read(AgentsDataFile)]
end
get_bsig() click to toggle source

Return a BSig model from cached file

# File lib/sfpagent/agent.rb, line 313
def self.get_bsig
        return nil if not File.exist?(BSigFile)
        return @@bsig if File.mtime(BSigFile) == @@bsig_modified_time

        begin
                data = File.read(BSigFile)
                @@bsig = (data.length > 0 ? JSON[data] : nil)
                @@bsig_modified_time = File.mtime(BSigFile)
                return @@bsig
        rescue Exception => e
                Sfp::Agent.logger.error "Get the BSig model [Failed] #{e}\n#{e.backtrace.join("\n")}"
        end
        false
end
get_cache_model(name) click to toggle source
# File lib/sfpagent/agent.rb, line 195
def self.get_cache_model(name)
        if File.exist?(CacheModelFile)
                model = JSON[File.read(CacheModelFile)]
                return model[name] if model.has_key?(name)
        end
        nil
end
get_log(n=0) click to toggle source
# File lib/sfpagent/agent.rb, line 575
def self.get_log(n=0)
        return '' if not File.exist?(LogFile)
        if n <= 0
                File.read(LogFile)
        else
                `tail -n #{n} #{LogFile}`
        end
end
get_module_hash(name) click to toggle source
# File lib/sfpagent/agent.rb, line 482
def self.get_module_hash(name)
        return nil if @@config[:modules_dir].to_s == ''

        module_dir = "#{@@config[:modules_dir]}/#{name}"
        if File.directory? module_dir
                if `which md5sum`.strip.length > 0
                        return `find #{module_dir} -type f -exec md5sum {} + | awk '{print $1}' | sort | md5sum | awk '{print $1}'`.strip
                elsif `which md5`.strip.length > 0
                        return `find #{module_dir} -type f -exec md5 {} + | awk '{print $4}' | sort | md5`.strip
                end
        end
        nil
end
get_modules() click to toggle source
# File lib/sfpagent/agent.rb, line 496
def self.get_modules
        #return [] if not (defined? @@modules and @@modules.is_a?(Hash))
        #data = {}
        #@@modules.each_key { |m| data[m] = get_module_hash(m) }
        #data
        (defined?(@@modules) ? @@modules : {})
end
get_sfp(module_name) click to toggle source
# File lib/sfpagent/agent.rb, line 473
def self.get_sfp(module_name)
        dir = @@config[:modules_dir]

        filepath = "#{dir}/#{module_name}/#{module_name}.sfp"
        sfp = parse(filepath).root
        sfp.accept(Sfp::Visitor::ParentEliminator.new)
        JSON.generate(sfp)
end
get_state(as_sfp=true) click to toggle source

Return the current state of the model.

# File lib/sfpagent/agent.rb, line 339
def self.get_state(as_sfp=true)
        @@runtime_lock.synchronize {
                return nil if !defined?(@@runtime) or @@runtime.nil?
                begin
                        return @@runtime.get_state(as_sfp)
                rescue Exception => e
                        Sfp::Agent.logger.error "Get state [Failed] #{e}\n#{e.backtrace.join("\n")}"
                end
        }
        false
end
install_module(name, data, reload=true) click to toggle source
# File lib/sfpagent/agent.rb, line 537
def self.install_module(name, data, reload=true)
        return false if @@config[:modules_dir].to_s == '' or data.nil?

        if !File.directory? @@config[:modules_dir]
                File.delete @@config[:modules_dir] if File.exist? @@config[:modules_dir]
                Dir.mkdir(@@config[:modules_dir], 0700)
        end

        # delete old files
        module_dir = Shellwords.escape("#{@@config[:modules_dir]}/#{name}")
        return false if not subpath(module_dir, @@config[:modules_dir])
        system("rm -rf #{module_dir}") if File.exist?(module_dir)

        # save the archive
        Dir.mkdir("#{module_dir}", 0700)
        File.open("#{module_dir}/data.tgz", 'wb', 0600) { |f| f.syswrite data }

        # extract the archive and the files
        system("cd #{module_dir}; tar xvf data.tgz")
        Dir.entries(module_dir).each { |name|
                next if name == '.' or name == '..'
                name = Shellwords.escape(name)
                target = "#{module_dir}/#{name}"
                return false if not subpath(target, @@config[:modules_dir])
                if File.directory? target
                        system("cd #{target} && mv * .. && mv .* .. 2>/dev/null ; cd .. && rm -rf #{name}")
                end
                datafile = "#{module_dir}/data.tgz"
                File.delete(datafile) if File.exist?(datafile)
        }

        load_modules(@@config) if reload
        
        Sfp::Agent.logger.info "Installing module #{name} [OK]"

        true
end
install_modules(modules) click to toggle source
# File lib/sfpagent/agent.rb, line 529
def self.install_modules(modules)
        modules.each { |name,data| return false if not install_module(name, data, false) }

        load_modules(@@config)

        true
end
is_windows() click to toggle source
# File lib/sfpagent/agent.rb, line 203
def self.is_windows
        (RbConfig::CONFIG['host_os'] =~ /mswin|mingw|cygwin/)
end
load_modules(p={}) click to toggle source

 Load all modules in given agent’s module directory.

options:
    :dir => directory that contains all modules
# File lib/sfpagent/agent.rb, line 432
def self.load_modules(p={})
        dir = p[:modules_dir]

        @@modules = {}
        counter = 0
        if dir != '' and File.directory?(dir)
                Sfp::Agent.logger.info "Modules directory: #{dir}"
                Dir.entries(dir).each do |name|
                        module_dir = "#{dir}/#{name}"
                        next if name == '.' or name == '..' or not File.directory?(module_dir)
                        module_file = "#{module_dir}/#{name}.rb"
                        if File.exist?(module_file)
                                begin
                                        ### use 'load' than 'require' to rewrite previous definitions
                                        load module_file
                                        Sfp::Agent.logger.info "Loading module #{module_dir} [OK]"
                                        counter += 1
                                        @@modules[name] = {
                                                :type => :ruby,
                                                :home => module_dir,
                                                :hash => get_module_hash(name)
                                        }
                                rescue Exception => e
                                        Sfp::Agent.logger.warn "Loading module #{dir}/#{name} [Failed]\n#{e}"
                                end
                        elsif File.exist?("#{module_dir}/main")
                                Sfp::Agent.logger.info "Loading module #{module_dir} [OK]"
                                @@modules[name] = {
                                        :type => :shell,
                                        :home => module_dir,
                                        :hash => get_module_hash(name)
                                }
                                counter += 1
                        else
                                logger.warn "Module #{module_dir} is invalid."
                        end
                end
        end
        Sfp::Agent.logger.info "Successfully loading #{counter} modules."
end
logger() click to toggle source
# File lib/sfpagent/agent.rb, line 51
def self.logger
        @@logger
end
pid() click to toggle source
# File lib/sfpagent/agent.rb, line 176
def self.pid
        begin
                pid = File.read(PIDFile).to_i
                return pid if Process.kill 0, pid
        rescue
        end
        nil
end
resolve(path, as_sfp=true) click to toggle source
# File lib/sfpagent/agent.rb, line 371
def self.resolve(path, as_sfp=true)
        return Sfp::Undefined.new if !defined?(@@runtime) or @@runtime.nil? or @@runtime.root.nil?
        begin
                path = path.simplify
                _, node, _ = path.split('.', 3)
                if @@runtime.root.has_key?(node)
                        # local resolve
                        parent, attribute = path.pop_ref
                        mod = @@runtime.root.at?(parent)
                        if mod.is_a?(Hash)
                                mod[:_self].update_state
                                state = mod[:_self].state
                                return state[attribute] if state.has_key?(attribute)
                        end
                        return Sfp::Undefined.new
                end

                agents = get_agents
                if agents[node].is_a?(Hash)
                        # remote resolve
                        agent = agents[node]
                        path = path[1, path.length-1].gsub /\./, '/'
                        code, data = NetHelper.get_data(agent['sfpAddress'], agent['sfpPort'], "/state#{path}")
                        if code.to_i == 200
                                state = JSON[data]['state']
                                return Sfp::Unknown.new if state == '<sfp::unknown>'
                                return state if !state.is_a?(String) or state[0,15] != '<sfp::undefined'
                        end
                end
        rescue Exception => e
                Sfp::Agent.logger.error "Resolve #{path} [Failed] #{e}\n#{e.backtrace.join("\n")}"
        end
        Sfp::Undefined.new
end
resolve_model(path) click to toggle source
# File lib/sfpagent/agent.rb, line 351
def self.resolve_model(path)
        return Sfp::Undefined.new if !defined?(@@runtime) or @@runtime.nil? or @@runtime.root.nil?
        begin
                path = path.simplify
                value = @@runtime.model.at?(path)
                if value.is_a?(Sfp::Unknown)
                        _, name, rest = path.split('.', 3)
                        model = get_cache_model(name)
                        if !model.nil? and model.has_key?('model')
                                value = (rest.to_s.length <= 0 ? model['model'] : model['model'].at?("$.#{rest}"))
                                value.accept(ParentEliminator) if value.is_a?(Hash)
                        end
                end
                return value
        rescue Exception => e
                Sfp::Agent.logger.error "Resolve model #{path} [Failed] #{e}\n#{e.backtrace.join("\n")}"
        end
        Sfp::Undefined.new
end
runtime() click to toggle source
# File lib/sfpagent/agent.rb, line 59
def self.runtime
        @@runtime
end
set_agents(data) click to toggle source

parameter:

:data => To delete an agent: { "agent_name" => nil }
         To add/modify an agent: { "agent_name" => { "sfpAddress" => "10.0.0.1", "sfpPort" => 1314 } }
# File lib/sfpagent/agent.rb, line 598
def self.set_agents(data)
        data.each { |name,agent|
                return false if agent.is_a?(Hash) and (not agent['sfpAddress'].is_a?(String) or
                                agent['sfpAddress'].strip == '' or agent['sfpPort'].to_i <= 0)
        }

        updated = false
        agents = nil
        File.open(AgentsDataFile, File::RDWR|File::CREAT, 0644) { |f|
                f.flock(File::LOCK_EX)
                json = f.read
                agents = (json == '' ? {} : JSON[json])
                current_hash = agents.hash

                #data.each { |k,v|
                #   if !agents.has_key?(k) or v.nil? or agents[k].hash != v.hash
                #           agents[k] = v
                #   end
                #}
                agents.merge!(data)
                agents.keys.each { |k| agents.delete(k) if agents[k].nil? }

                if current_hash != agents.hash
                        updated = true
                        f.rewind
                        f.write(JSON.generate(agents))
                        f.flush
                        f.truncate(f.pos)
                end
        }

        if updated
                @@agents_database = agents
                Thread.new {
                        # if updated then broadcast to other agents
                        http_data = {'agents' => JSON.generate(data)}
                        agents.each { |name,agent|
                                begin
                                        code, _ = NetHelper.put_data(agent['sfpAddress'], agent['sfpPort'], '/agents', http_data, 5, 20)
                                        raise Exception if code != '200'
                                rescue #Exception => e
                                        #Sfp::Agent.logger.warn "Push agents list to #{agent['sfpAddress']}:#{agent['sfpPort']} [Failed]"
                                end
                        }
                }

                # update /etc/hosts so other applications can resolve the names correctly
                begin
                        File.open('/etc/hosts', File::RDWR, 0644) do |f|
                                f.flock(File::LOCK_EX)
                                output = ''
                                ### remove previous records
                                f.read.each_line do |line|
                                        if line =~ /# Edited by Nuri/
                                                break
                                        else
                                                output << line
                                        end
                                end
                                ### add current records
                                output << "# Edited by Nuri (don't change or add anything below this line)\n"
                                agents.each do |name,agent|
                                        next if not (agent['sfpAddress'] =~ /^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$/)
                                        output << "#{agent['sfpAddress']}\t#{name}\n"
                                end
                                ### save records
                                f.rewind
                                f.write(output)
                                f.flush
                                f.truncate(f.pos)
                        end
                rescue Exception => e
                        Sfp::Agent.warn 'cannot update /etc/hosts'
                end
        end

        true
end
set_bsig(bsig) click to toggle source

Setting a new BSig model: set @@bsig variable, and save in cached file

# File lib/sfpagent/agent.rb, line 288
def self.set_bsig(bsig)
        begin
                File.open(BSigFile, File::RDWR|File::CREAT, 0600) { |f|
                        f.flock(File::LOCK_EX)
                        Sfp::Agent.logger.info "Setting the BSig model [Wait]"
                        f.rewind
                        data = ''
                        if !bsig.nil?
                                bsig['operators'].each_index { |i| bsig['operators'][i]['id'] = i }
                                data = JSON.generate(bsig)
                        end
                        f.write(data)
                        f.flush
                        f.truncate(f.pos)
                }
                Sfp::Agent.logger.info "Setting the BSig model [OK]"
                return true
        rescue Exception => e
                Sfp::Agent.logger.error "Setting the BSig model [Failed] #{e}\n#{e.backtrace.join("\n")}"
        end
        false
end
set_cache_model(p={}) click to toggle source
# File lib/sfpagent/agent.rb, line 207
def self.set_cache_model(p={})
        File.open(CacheModelFile, File::RDWR|File::CREAT, 0600) do |f|
                f.flock(File::LOCK_EX)
                json = f.read
                model = (json.length >= 2 ? JSON[json] : {})

                if p[:name]
                        if p[:model]
                                model[p[:name]] = p[:model]
                                Sfp::Agent.logger.info "Saving cache model for #{p[:name]}..."
                        else
                                model.delete(p[:name]) if model.has_key?(p[:name])
                                Sfp::Agent.logger.info "Deleting cache model for #{p[:name]}..."
                        end
                else
                        model = {}
                        Sfp::Agent.logger.info "Deleting all cache model..."
                end

                f.rewind
                f.write(JSON.generate(model))
                f.flush
                f.truncate(f.pos)
        end

        true
end
set_model(model) click to toggle source

Save given model to cached file, and then reload the model.

# File lib/sfpagent/agent.rb, line 237
def self.set_model(model)
        begin
                # generate MD5 hash for the new model
                data = JSON.generate(model)
                new_model_hash = Digest::MD5.hexdigest(data)

                # save the new model if it's not same with the existing one
                if Digest::MD5.hexdigest(data) != @@current_model_hash
                        Sfp::Agent.logger.info "Setting new model [Wait]"
                        File.open(ModelFile, File::RDWR|File::CREAT, 0600) { |f|
                                f.flock(File::LOCK_EX)
                                f.rewind
                                f.write(data)
                                f.flush
                                f.truncate(f.pos)
                        }
                        update_model
                        Sfp::Agent.logger.info "Setting the model [OK]"
                end
                return true
        rescue Exception => e
                Sfp::Agent.logger.error "Setting the model [Failed] #{e}\n#{e.backtrace.join("\n")}"
        end
        false
end
start(opts={}) click to toggle source

Start the agent.

options:

:daemon   => true if running as a daemon, false if as a console application
:port     => port of web server will listen to
:ssl      => set true to enable HTTPS
:certfile => certificate file path for HTTPS
:keyfile  => key file path for HTTPS
# File lib/sfpagent/agent.rb, line 72
def self.start(opts={})
        Sfp::Agent.logger.info "Starting agent..."
        puts "Starting agent..."

        @@config = opts

        Process.daemon if opts[:daemon] and not opts[:mock]

        begin
                # check modules directory, and create it if it's not exist
                opts[:modules_dir] = File.expand_path(opts[:modules_dir].to_s.strip != '' ? opts[:modules_dir].to_s : "#{Home}/modules")
                Dir.mkdir(opts[:modules_dir], 0700) if not File.exist?(opts[:modules_dir])

                # load modules from cached directory
                load_modules(opts)

                # reload model
                update_model({:rebuild => true})

                # create web server
                server_type = WEBrick::SimpleServer
                port = (opts[:port] ? opts[:port] : DefaultPort)
                config = { :Host => '0.0.0.0',
                           :Port => port,
                           :ServerType => server_type,
                           :pid => '/tmp/webrick.pid',
                           :Logger => Sfp::Agent.logger }
                if opts[:ssl]
                        config[:SSLEnable] = true
                        config[:SSLVerifyClient] = OpenSSL::SSL::VERIFY_NONE
                        config[:SSLCertificate] = OpenSSL::X509::Certificate.new(File.open(opts[:certfile]).read)
                        config[:SSLPrivateKey] = OpenSSL::PKey::RSA.new(File.open(opts[:keyfile]).read)
                        config[:SSLCertName] = [["CN", WEBrick::Utils::getservername]]
                end
                server = WEBrick::HTTPServer.new(config)
                server.mount("/", Sfp::Agent::Handler, Sfp::Agent.logger)

                # create maintenance object
                maintenance = Maintenance.new(opts)

                if not is_windows
                        # trap signal
                        ['INT', 'KILL', 'HUP'].each do |signal|
                                trap(signal) {
                                        maintenance.stop

                                        Sfp::Agent.logger.info "Shutting down web server and BSig engine..."
                                        bsig_engine.stop
                                        loop do
                                                break if bsig_engine.status == :stopped
                                                sleep 1
                                        end
                                        server.shutdown
                                }
                        end
                end

                File.open(PIDFile, 'w', 0644) { |f| f.write($$.to_s) }

                bsig_engine.start

                maintenance.start

                server.start if not opts[:mock]

        rescue Exception => e
                Sfp::Agent.logger.error "Starting the agent [Failed] #{e}\n#{e.backtrace.join("\n")}"

                raise e
        end
end
status() click to toggle source

Return agent’s PID if it is running, otherwise nil.

# File lib/sfpagent/agent.rb, line 187
def self.status
        if pid.nil?
                puts "Agent is not running."
        else
                puts "Agent is running with PID #{pid}"
        end
end
stop(opts={}) click to toggle source

Stop the agent’s daemon.

# File lib/sfpagent/agent.rb, line 146
def self.stop(opts={})
        begin
                pid = File.read(PIDFile).to_i
                puts "Stopping agent with PID #{pid}..."
                Process.kill 'HUP', pid

                if not opts[:mock]
                        begin
                                sleep (Sfp::BSig::SleepTime + 0.25)

                                Process.kill 0, pid
                                Sfp::Agent.logger.info "Agent is still running."
                                puts "Agent is still running."

                                Sfp::Agent.logger.info "Killing agent."
                                puts "Killing agent."
                                Process.kill 9, pid
                        rescue
                                Sfp::Agent.logger.info "Agent has stopped."
                                puts "Agent has stopped."
                                File.delete(PIDFile) if File.exist?(PIDFile)
                        end
                end

        rescue
                puts "Agent is not running."
                File.delete(PIDFile) if File.exist?(PIDFile)
        end
end
subpath(path1, path2) click to toggle source

return true if path1 is subpath of path2, otherwise false

# File lib/sfpagent/agent.rb, line 687
def self.subpath(path1, path2)
        path1 = File.expand_path(path1)
        path2 = File.expand_path(path2)
        (path1[0,path2.length] == path2)
end
uninstall_all_modules(p={}) click to toggle source
# File lib/sfpagent/agent.rb, line 504
def self.uninstall_all_modules(p={})
        return true if @@config[:modules_dir] == ''
        if system("rm -rf #{@@config[:modules_dir]}/*")
                load_modules(@@config)
                Sfp::Agent.logger.info "Deleting all modules [OK]"
                return true
        end
        Sfp::Agent.logger.info "Deleting all modules [Failed]"
        false
end
uninstall_module(name) click to toggle source
# File lib/sfpagent/agent.rb, line 515
def self.uninstall_module(name)
        return false if @@config[:modules_dir] == ''
        module_dir = Shellwords.escape(File.expand_path("#{@@config[:modules_dir]}/#{name}"))
        return false if not subpath(module_dir, @@config[:modules_dir])
        if File.directory?(module_dir)
                result = !!system("rm -rf #{module_dir}")
        else
                result = true
        end
        load_modules(@@config)
        Sfp::Agent.logger.info "Deleting module #{name} " + (result ? "[OK]" : "[Failed]")
        result
end
update_model(p={}) click to toggle source

Reload the model from cached file.

# File lib/sfpagent/agent.rb, line 265
def self.update_model(p={})
        if not File.exist?(ModelFile)
                Sfp::Agent.logger.info "There is no model in cache."
        else
                begin
                        @@runtime_lock.synchronize {
                                data = File.read(ModelFile)
                                @@current_model_hash = Digest::MD5.hexdigest(data)
                                if !defined?(@@runtime) or @@runtime.nil? or p[:rebuild]
                                        @@runtime = Sfp::Runtime.new(JSON[data])
                                else
                                        @@runtime.set_model(JSON[data])
                                end
                        }
                        Sfp::Agent.logger.info "Reloading the model in cache [OK]"
                rescue Exception => e
                        Sfp::Agent.logger.error "Reloading the model in cache [Failed] #{e}\n#{e.backtrace.join("\n")}"
                end
        end
end
whoami?() click to toggle source
# File lib/sfpagent/agent.rb, line 332
def self.whoami?
        return nil if !defined?(@@runtime) or @@runtime.nil?
        @@runtime.whoami?
end