#!/usr/bin/env python

# # This is the MS subset of the W3C test suite for XML Schemas. # This file is generated from the MS W3c test suite description file. #

import sys, os import exceptions, optparse import libxml2

opa = optparse.OptionParser()

opa.add_option(“-b”, “–base”, action=“store”, type=“string”, dest=“baseDir”,

default="",
help="""The base directory; i.e. the parent folder of the
"nisttest", "suntest" and "msxsdtest" directories.""")

opa.add_option(“-o”, “–out”, action=“store”, type=“string”, dest=“logFile”,

default="test.log",
help="The filepath of the log file to be created")

opa.add_option(“–log”, action=“store_true”, dest=“enableLog”,

default=False,
help="Create the log file")

opa.add_option(“–no-test-out”, action=“store_true”, dest=“disableTestStdOut”,

default=False,
help="Don't output test results")

opa.add_option(“-s”, “–silent”, action=“store_true”, dest=“silent”, default=False,

help="Disables display of all tests")

opa.add_option(“-v”, “–verbose”, action=“store_true”, dest=“verbose”,

default=False,
help="Displays all tests (only if --silent is not set)")

opa.add_option(“-x”, “–max”, type=“int”, dest=“maxTestCount”,

default="-1",
help="The maximum number of tests to be run")

opa.add_option(“-t”, “–test”, type=“string”, dest=“singleTest”,

default=None,
help="Runs the specified test only")

opa.add_option(“–tsw”, “–test-starts-with”, type=“string”, dest=“testStartsWith”,

default=None,
help="Runs the specified test(s), starting with the given string")

opa.add_option(“–rieo”, “–report-internal-errors-only”, action=“store_true”,

dest="reportInternalErrOnly", default=False,
help="Display erroneous tests of type 'internal' only")

opa.add_option(“–rueo”, “–report-unimplemented-errors-only”, action=“store_true”,

dest="reportUnimplErrOnly", default=False,
help="Display erroneous tests of type 'unimplemented' only")

opa.add_option(“–rmleo”, “–report-mem-leak-errors-only”, action=“store_true”,

dest="reportMemLeakErrOnly", default=False,
help="Display erroneous tests of type 'memory leak' only")

opa.add_option(“-c”, “–combines”, type=“string”, dest=“combines”,

default=None,
help="Combines to be run (all if omitted)")

opa.add_option(“–csw”, “–csw”, type=“string”, dest=“combineStartsWith”,

default=None,
help="Combines to be run (all if omitted)")

opa.add_option(“–rc”, “–report-combines”, action=“store_true”,

dest="reportCombines", default=False,
help="Display combine reports")

opa.add_option(“–rec”, “–report-err-combines”, action=“store_true”,

dest="reportErrCombines", default=False,
help="Display erroneous combine reports only")

opa.add_option(“–debug”, action=“store_true”,

dest="debugEnabled", default=False,
help="Displays debug messages")

opa.add_option(“–info”, action=“store_true”,

dest="info", default=False,
help="Displays info on the suite only. Does not run any test.")

opa.add_option(“–sax”, action=“store_true”,

dest="validationSAX", default=False,
help="Use SAX2-driven validation.")

opa.add_option(“–tn”, action=“store_true”,

dest="displayTestName", default=False,
help="Display the test name in every case.")

(options, args) = opa.parse_args()

if options.combines is not None:

options.combines = options.combines.split()

################################################ # The vars below are not intended to be changed. #

msgSchemaNotValidButShould = “The schema should be valid.” msgSchemaValidButShouldNot = “The schema should be invalid.” msgInstanceNotValidButShould = “The instance should be valid.” msgInstanceValidButShouldNot = “The instance should be invalid.” vendorNIST = “NIST” vendorNIST_2 = “NIST-2” vendorSUN = “SUN” vendorMS = “MS”

################### # Helper functions. # vendor = None

def handleError(test, msg):

global options
if not options.silent:
        test.addLibLog("'%s'   LIB: %s" % (test.name, msg))
if msg.find("Unimplemented") > -1:
        test.failUnimplemented()
elif msg.find("Internal") > -1:
        test.failInternal()

def fixFileNames(fileName):

if (fileName is None) or (fileName == ""):
        return ""
dirs = fileName.split("/")
if dirs[1] != "Tests":
        fileName = os.path.join(".", "Tests")
        for dir in dirs[1:]:
                fileName = os.path.join(fileName, dir)  
return fileName

class XSTCTestGroup:

def __init__(self, name, schemaFileName, descr):
        global vendor, vendorNIST_2
        self.name = name
        self.descr = descr
        self.mainSchema = True
        self.schemaFileName = fixFileNames(schemaFileName)
        self.schemaParsed = False
        self.schemaTried = False

def setSchema(self, schemaFileName, parsed):
        if not self.mainSchema:                 
                return
        self.mainSchema = False
        self.schemaParsed = parsed
        self.schemaTried = True

class XSTCTestCase:

           # <!-- groupName, Name, Accepted, File, Val, Descr
def __init__(self, isSchema, groupName, name, accepted, file, val, descr):
        global options
        #
        # Constructor.
        #
        self.testRunner = None
        self.isSchema = isSchema
        self.groupName = groupName
        self.name = name
        self.accepted = accepted                
        self.fileName = fixFileNames(file)
        self.val = val
        self.descr = descr
        self.failed = False
        self.combineName = None

        self.log = []
        self.libLog = []
        self.initialMemUsed = 0
        self.memLeak = 0
        self.excepted = False
        self.bad = False
        self.unimplemented = False
        self.internalErr = False
        self.noSchemaErr = False
        self.failed = False
        #
        # Init the log.
        #
        if not options.silent:
                if self.descr is not None:
                        self.log.append("'%s'   descr: %s\n" % (self.name, self.descr))         
                self.log.append("'%s'   exp validity: %d\n" % (self.name, self.val))

def initTest(self, runner):
        global vendorNIST, vendorSUN, vendorMS, vendorNIST_2, options, vendor
        #
        # Get the test-group.
        #
        self.runner = runner
        self.group = runner.getGroup(self.groupName)                            
        if vendor == vendorMS or vendor == vendorSUN:
                #
                # Use the last given directory for the combine name.
                #
                dirs = self.fileName.split("/")
                self.combineName = dirs[len(dirs) -2]                                   
        elif vendor == vendorNIST:
                #
                # NIST files are named in the following form:
                # "NISTSchema-short-pattern-1.xsd"
                #                                               
                tokens = self.name.split("-")
                self.combineName = tokens[1]
        elif vendor == vendorNIST_2:
                #
                # Group-names have the form: "atomic-normalizedString-length-1"
                #
                tokens = self.groupName.split("-")
                self.combineName = "%s-%s" % (tokens[0], tokens[1])
        else:
                self.combineName = "unkown"
                raise Exception("Could not compute the combine name of a test.")
        if (not options.silent) and (self.group.descr is not None):
                self.log.append("'%s'   group-descr: %s\n" % (self.name, self.group.descr))

def addLibLog(self, msg):               
        """This one is intended to be used by the error handler
        function"""
        global options          
        if not options.silent:
                self.libLog.append(msg)

def fail(self, msg):
        global options
        self.failed = True
        if not options.silent:
                self.log.append("'%s' ( FAILED: %s\n" % (self.name, msg))

def failNoSchema(self):
        global options
        self.failed = True
        self.noSchemaErr = True
        if not options.silent:
                self.log.append("'%s' X NO-SCHEMA\n" % (self.name))

def failInternal(self):
        global options
        self.failed = True
        self.internalErr = True
        if not options.silent:
                self.log.append("'%s' * INTERNAL\n" % self.name)

def failUnimplemented(self):
        global options
        self.failed = True
        self.unimplemented = True
        if not options.silent:
                self.log.append("'%s' ? UNIMPLEMENTED\n" % self.name)

def failCritical(self, msg):
        global options
        self.failed = True
        self.bad = True
        if not options.silent:
                self.log.append("'%s' ! BAD: %s\n" % (self.name, msg))

def failExcept(self, e):
        global options
        self.failed = True
        self.excepted = True
        if not options.silent:
                self.log.append("'%s' # EXCEPTION: %s\n" % (self.name, e.__str__()))

def setUp(self):
        #
        # Set up Libxml2.
        #
        self.initialMemUsed = libxml2.debugMemory(1)
        libxml2.initParser()
        libxml2.lineNumbersDefault(1)
        libxml2.registerErrorHandler(handleError, self)

def tearDown(self):
        libxml2.schemaCleanupTypes()
        libxml2.cleanupParser()
        self.memLeak = libxml2.debugMemory(1) - self.initialMemUsed

def isIOError(self, file, docType):
        err = None
        try:
                err = libxml2.lastError()
        except:
                # Suppress exceptions.
                pass
        if (err is None):
                return False
        if err.domain() == libxml2.XML_FROM_IO:
                self.failCritical("failed to access the %s resource '%s'\n" % (docType, file))

def debugMsg(self, msg):
        global options
        if options.debugEnabled:
                sys.stdout.write("'%s'   DEBUG: %s\n" % (self.name, msg))

def finalize(self):
        global options
        """Adds additional info to the log."""
        #
        # Add libxml2 messages.
        #
        if not options.silent:
                self.log.extend(self.libLog)
                #
                # Add memory leaks.
                #
                if self.memLeak != 0:
                        self.log.append("%s + memory leak: %d bytes\n" % (self.name, self.memLeak))

def run(self):
        """Runs a test."""
        global options

        ##filePath = os.path.join(options.baseDir, self.fileName)
        # filePath = "%s/%s/%s/%s" % (options.baseDir, self.test_Folder, self.schema_Folder, self.schema_File)
        if options.displayTestName:
                sys.stdout.write("'%s'\n" % self.name)
        try:
                self.validate()
        except (Exception, libxml2.parserError, libxml2.treeError), e:
                self.failExcept(e)

def parseSchema(fileName):

schema = None
ctxt = libxml2.schemaNewParserCtxt(fileName)
try:
        try:
                schema = ctxt.schemaParse()
        except:
                pass
finally:                
        del ctxt
        return schema

class XSTCSchemaTest(XSTCTestCase):

def __init__(self, groupName, name, accepted, file, val, descr):
        XSTCTestCase.__init__(self, 1, groupName, name, accepted, file, val, descr)

def validate(self):
        global msgSchemaNotValidButShould, msgSchemaValidButShouldNot
        schema = None
        filePath = self.fileName
        # os.path.join(options.baseDir, self.fileName)
        valid = 0
        try:
                #
                # Parse the schema.
                #
                self.debugMsg("loading schema: %s" % filePath)
                schema = parseSchema(filePath)
                self.debugMsg("after loading schema")                                           
                if schema is None:
                        self.debugMsg("schema is None")
                        self.debugMsg("checking for IO errors...")
                        if self.isIOError(file, "schema"):
                                return
                self.debugMsg("checking schema result")
                if (schema is None and self.val) or (schema is not None and self.val == 0):
                        self.debugMsg("schema result is BAD")
                        if (schema == None):
                                self.fail(msgSchemaNotValidButShould)
                        else:
                                self.fail(msgSchemaValidButShouldNot)
                else:
                        self.debugMsg("schema result is OK")
        finally:
                self.group.setSchema(self.fileName, schema is not None)
                del schema

class XSTCInstanceTest(XSTCTestCase):

def __init__(self, groupName, name, accepted, file, val, descr):
        XSTCTestCase.__init__(self, 0, groupName, name, accepted, file, val, descr)

def validate(self):
        instance = None
        schema = None
        filePath = self.fileName
        # os.path.join(options.baseDir, self.fileName)

        if not self.group.schemaParsed and self.group.schemaTried:
                self.failNoSchema()
                return

        self.debugMsg("loading instance: %s" % filePath)
        parserCtxt = libxml2.newParserCtxt()
        if (parserCtxt is None):
                # TODO: Is this one necessary, or will an exception
                # be already raised?
                raise Exception("Could not create the instance parser context.")
        if not options.validationSAX:
                try:
                        try:
                                instance = parserCtxt.ctxtReadFile(filePath, None, libxml2.XML_PARSE_NOWARNING)
                        except:
                                # Suppress exceptions.
                                pass
                finally:
                        del parserCtxt
                self.debugMsg("after loading instance")
                if instance is None:
                        self.debugMsg("instance is None")
                        self.failCritical("Failed to parse the instance for unknown reasons.")
                        return          
        try:
                #
                # Validate the instance.
                #
                self.debugMsg("loading schema: %s" % self.group.schemaFileName)
                schema = parseSchema(self.group.schemaFileName)
                try:
                        validationCtxt = schema.schemaNewValidCtxt()
                        #validationCtxt = libxml2.schemaNewValidCtxt(None)
                        if (validationCtxt is None):
                                self.failCritical("Could not create the validation context.")
                                return
                        try:
                                self.debugMsg("validating instance")
                                if options.validationSAX:
                                        instance_Err = validationCtxt.schemaValidateFile(filePath, 0)
                                else:
                                        instance_Err = validationCtxt.schemaValidateDoc(instance)
                                self.debugMsg("after instance validation")
                                self.debugMsg("instance-err: %d" % instance_Err)
                                if (instance_Err != 0 and self.val == 1) or (instance_Err == 0 and self.val == 0):
                                        self.debugMsg("instance result is BAD")
                                        if (instance_Err != 0):
                                                self.fail(msgInstanceNotValidButShould)
                                        else:
                                                self.fail(msgInstanceValidButShouldNot)

                                else:
                                                        self.debugMsg("instance result is OK")
                        finally:
                                del validationCtxt
                finally:
                        del schema
        finally:
                if instance is not None:
                        instance.freeDoc()

#################### # Test runner class. #

class XSTCTestRunner:

CNT_TOTAL = 0
CNT_RAN = 1
CNT_SUCCEEDED = 2
CNT_FAILED = 3
CNT_UNIMPLEMENTED = 4
CNT_INTERNAL = 5
CNT_BAD = 6
CNT_EXCEPTED = 7
CNT_MEMLEAK = 8
CNT_NOSCHEMA = 9
CNT_NOTACCEPTED = 10
CNT_SCHEMA_TEST = 11

def __init__(self):
        self.logFile = None
        self.counters = self.createCounters()
        self.testList = []
        self.combinesRan = {}
        self.groups = {}
        self.curGroup = None

def createCounters(self):
        counters = {self.CNT_TOTAL:0, self.CNT_RAN:0, self.CNT_SUCCEEDED:0,
        self.CNT_FAILED:0, self.CNT_UNIMPLEMENTED:0, self.CNT_INTERNAL:0, self.CNT_BAD:0,
        self.CNT_EXCEPTED:0, self.CNT_MEMLEAK:0, self.CNT_NOSCHEMA:0, self.CNT_NOTACCEPTED:0,
        self.CNT_SCHEMA_TEST:0}

        return counters

def addTest(self, test):
        self.testList.append(test)
        test.initTest(self)

def getGroup(self, groupName):
        return self.groups[groupName]

def addGroup(self, group):
        self.groups[group.name] = group

def updateCounters(self, test, counters):
        if test.memLeak != 0:
                counters[self.CNT_MEMLEAK] += 1
        if not test.failed:
                counters[self.CNT_SUCCEEDED] +=1
        if test.failed:
                counters[self.CNT_FAILED] += 1
        if test.bad:
                counters[self.CNT_BAD] += 1
        if test.unimplemented:
                counters[self.CNT_UNIMPLEMENTED] += 1
        if test.internalErr:
                counters[self.CNT_INTERNAL] += 1
        if test.noSchemaErr:
                counters[self.CNT_NOSCHEMA] += 1
        if test.excepted:
                counters[self.CNT_EXCEPTED] += 1
        if not test.accepted:
                counters[self.CNT_NOTACCEPTED] += 1
        if test.isSchema:
                counters[self.CNT_SCHEMA_TEST] += 1
        return counters

def displayResults(self, out, all, combName, counters):
        out.write("\n")
        if all:
                if options.combines is not None:
                        out.write("combine(s): %s\n" % str(options.combines))
        elif combName is not None:
                out.write("combine : %s\n" % combName)
        out.write("  total           : %d\n" % counters[self.CNT_TOTAL])
        if all or options.combines is not None:
                out.write("  ran             : %d\n" % counters[self.CNT_RAN])
                out.write("    (schemata)    : %d\n" % counters[self.CNT_SCHEMA_TEST])
        # out.write("    succeeded       : %d\n" % counters[self.CNT_SUCCEEDED])
        out.write("  not accepted    : %d\n" % counters[self.CNT_NOTACCEPTED])
        if counters[self.CNT_FAILED] > 0:                   
                out.write("    failed                  : %d\n" % counters[self.CNT_FAILED])
                out.write("     -> internal            : %d\n" % counters[self.CNT_INTERNAL])
                out.write("     -> unimpl.             : %d\n" % counters[self.CNT_UNIMPLEMENTED])
                out.write("     -> skip-invalid-schema : %d\n" % counters[self.CNT_NOSCHEMA])
                out.write("     -> bad                 : %d\n" % counters[self.CNT_BAD])
                out.write("     -> exceptions          : %d\n" % counters[self.CNT_EXCEPTED])
                out.write("    memory leaks            : %d\n" % counters[self.CNT_MEMLEAK])

def displayShortResults(self, out, all, combName, counters):
        out.write("Ran %d of %d tests (%d schemata):" % (counters[self.CNT_RAN],
                          counters[self.CNT_TOTAL], counters[self.CNT_SCHEMA_TEST]))
        # out.write("    succeeded       : %d\n" % counters[self.CNT_SUCCEEDED])
        if counters[self.CNT_NOTACCEPTED] > 0:
                out.write(" %d not accepted" % (counters[self.CNT_NOTACCEPTED]))
        if counters[self.CNT_FAILED] > 0 or counters[self.CNT_MEMLEAK] > 0:
                if counters[self.CNT_FAILED] > 0:
                        out.write(" %d failed" % (counters[self.CNT_FAILED]))
                        out.write(" (")
                        if counters[self.CNT_INTERNAL] > 0:
                                out.write(" %d internal" % (counters[self.CNT_INTERNAL]))
                        if counters[self.CNT_UNIMPLEMENTED] > 0:
                                out.write(" %d unimplemented" % (counters[self.CNT_UNIMPLEMENTED]))
                        if counters[self.CNT_NOSCHEMA] > 0:
                                out.write(" %d skip-invalid-schema" % (counters[self.CNT_NOSCHEMA]))
                        if counters[self.CNT_BAD] > 0:
                                out.write(" %d bad" % (counters[self.CNT_BAD]))
                        if counters[self.CNT_EXCEPTED] > 0:
                                out.write(" %d exception" % (counters[self.CNT_EXCEPTED]))
                        out.write(" )")
                if counters[self.CNT_MEMLEAK] > 0:
                        out.write(" %d leaks" % (counters[self.CNT_MEMLEAK]))                   
                out.write("\n")
        else:
                out.write(" all passed\n")

def reportCombine(self, combName):
        global options

        counters = self.createCounters()
        #
        # Compute evaluation counters.
        #
        for test in self.combinesRan[combName]:
                counters[self.CNT_TOTAL] += 1
                counters[self.CNT_RAN] += 1
                counters = self.updateCounters(test, counters)
        if options.reportErrCombines and (counters[self.CNT_FAILED] == 0) and (counters[self.CNT_MEMLEAK] == 0):
                pass
        else:
                if options.enableLog:
                        self.displayResults(self.logFile, False, combName, counters)                            
                self.displayResults(sys.stdout, False, combName, counters)

def displayTestLog(self, test):
        sys.stdout.writelines(test.log)
        sys.stdout.write("~~~~~~~~~~\n")

def reportTest(self, test):
        global options

        error = test.failed or test.memLeak != 0
        #
        # Only erroneous tests will be written to the log,
        # except @verbose is switched on.
        #
        if options.enableLog and (options.verbose or error):
                self.logFile.writelines(test.log)
                self.logFile.write("~~~~~~~~~~\n")
        #
        # if not @silent, only erroneous tests will be
        # written to stdout, except @verbose is switched on.
        #
        if not options.silent:
                if options.reportInternalErrOnly and test.internalErr:
                        self.displayTestLog(test)
                if options.reportMemLeakErrOnly and test.memLeak != 0:
                        self.displayTestLog(test)
                if options.reportUnimplErrOnly and test.unimplemented:
                        self.displayTestLog(test)
                if (options.verbose or error) and (not options.reportInternalErrOnly) and (not options.reportMemLeakErrOnly) and (not options.reportUnimplErrOnly):
                        self.displayTestLog(test)

def addToCombines(self, test):
        found = False
        if self.combinesRan.has_key(test.combineName):
                self.combinesRan[test.combineName].append(test)
        else:
                self.combinesRan[test.combineName] = [test]

def run(self):

        global options

        if options.info:
                for test in self.testList:
                        self.addToCombines(test)
                sys.stdout.write("Combines: %d\n" % len(self.combinesRan))
                sys.stdout.write("%s\n" % self.combinesRan.keys())
                return

        if options.enableLog:
                self.logFile = open(options.logFile, "w")
        try:
                for test in self.testList:
                        self.counters[self.CNT_TOTAL] += 1
                        #
                        # Filter tests.
                        #
                        if options.singleTest is not None and options.singleTest != "":
                                if (test.name != options.singleTest):
                                        continue
                        elif options.combines is not None:
                                if not options.combines.__contains__(test.combineName):
                                        continue
                        elif options.testStartsWith is not None:
                                if not test.name.startswith(options.testStartsWith):
                                        continue
                        elif options.combineStartsWith is not None:
                                if not test.combineName.startswith(options.combineStartsWith):
                                        continue

                        if options.maxTestCount != -1 and self.counters[self.CNT_RAN] >= options.maxTestCount:
                                break
                        self.counters[self.CNT_RAN] += 1
                        #
                        # Run the thing, dammit.
                        #
                        try:
                                test.setUp()
                                try:
                                        test.run()
                                finally:
                                        test.tearDown()
                        finally:
                                #
                                # Evaluate.
                                #
                                test.finalize()
                                self.reportTest(test)
                                if options.reportCombines or options.reportErrCombines:
                                        self.addToCombines(test)
                                self.counters = self.updateCounters(test, self.counters)
        finally:
                if options.reportCombines or options.reportErrCombines:
                        #
                        # Build a report for every single combine.
                        #
                        # TODO: How to sort a dict?
                        #
                        self.combinesRan.keys().sort(None)
                        for key in self.combinesRan.keys():
                                self.reportCombine(key)

                #
                # Display the final report.
                #
                if options.silent:
                        self.displayShortResults(sys.stdout, True, None, self.counters)
                else:
                        sys.stdout.write("===========================\n")
                        self.displayResults(sys.stdout, True, None, self.counters)