source: trunk/packages/xen-3.1/xen-3.1/tools/xm-test/lib/XmTestLib/Test.py @ 34

Last change on this file since 34 was 34, checked in by hartmans, 17 years ago

Add xen and xen-common

  • Property svn:mime-type set to text/script
File size: 5.0 KB
Line 
1#!/usr/bin/python
2
3"""
4 Copyright (C) International Business Machines Corp., 2005
5 Author: Dan Smith <danms@us.ibm.com>
6
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; under version 2 of the License.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
20"""
21
22##
23## These are utility functions for test cases
24##
25
26import sys
27import commands
28import os
29import pwd
30import time
31import pty
32import select
33import signal
34import re
35import glob
36
37TEST_PASS = 0
38TEST_FAIL = 255
39TEST_SKIP = 77
40
41# We currently advise waiting this many seconds for the ramdisk to
42# boot inside a domU
43TEST_DOMU_BOOT_DELAY = 20
44
45if os.environ.get("TEST_VERBOSE"):
46    verbose = True
47else:
48    verbose = False
49
50class TimeoutError(Exception):
51    def __init__(self, msg, outputSoFar):
52        self.msg = msg
53        self.output = outputSoFar
54
55    def __str__(self):
56        return str(self.msg)
57
58def runWithTimeout(cmd, timeout):
59    args = cmd.split()
60
61    pid, fd = pty.fork();
62
63    startTime = time.time()
64
65    if pid == 0:
66        os.execvp(args[0], args)
67
68    output = ""
69
70    while time.time() - startTime < timeout:
71        i, o, e = select.select([fd], [], [], timeout)
72
73        if fd in i:
74            try:
75                str = os.read(fd, 1)
76                output += str
77            except OSError, e:
78                exitPid, status = os.waitpid(pid, os.WNOHANG)
79
80                if exitPid == pid:
81                    if verbose:
82                        print "Child exited with %i" % status
83                    return status, output
84
85    if verbose:
86        print "Command timed out: killing pid %i" % pid
87       
88    os.kill(pid, signal.SIGINT)
89    raise TimeoutError("Command execution time exceeded %i seconds" % timeout,
90                       outputSoFar=output)
91
92def traceCommand(command, timeout=None, logOutput=True):
93    if verbose:
94        print "[dom0] Running `%s'" % command
95
96    if timeout:
97        status, output = runWithTimeout(command, timeout)
98    else:
99        status, output = commands.getstatusoutput(command)
100
101    if logOutput and verbose:
102        print output
103
104    return status, output
105
106def getTestName():
107    script = sys.argv[0]
108    fname = os.path.basename(script)
109    match = re.match("([^\.]+)\.[a-z]+", fname)
110    if match:
111        tname = match.group(1)
112    else:
113        tname = "UNKNOWN"
114
115    return tname
116
117def becomeNonRoot():
118    """Become a non-root user, or FAIL if this is not possible.  This call
119    succeeds if we are already running as a non-root user.
120    """
121   
122    if os.geteuid() == 0:
123        # Try and become "nobody".  This user is commonly in place, but this
124        # could be extended to consider any number of users to be acceptable,
125        # if there are systems where "nobody" is not present.
126        allusers = pwd.getpwall()
127        for u in allusers:
128            if u[0] == "nobody":
129                os.setreuid(u[2], u[2])
130                break
131        if os.geteuid() == 0:
132            FAIL("Could not become a non-root user")
133
134def FAIL(format, *args):
135    print "\nREASON:", (format % args)
136    sys.exit(TEST_FAIL)
137
138def SKIP(format, *args):
139    print "\nREASON:", (format % args)
140    sys.exit(TEST_SKIP)
141
142def saveLog(logText, filename=None):
143    if not filename:
144        filename = "log";
145    logfile = open(filename, 'w');
146    date = commands.getoutput("date");
147    logfile.write("-- BEGIN XmTest Log @" + date + "\n");
148    logfile.write(logText);
149    logfile.write("\n-- END XmTest Log\n");
150    logfile.close();
151
152def waitForBoot():
153    if verbose:
154        print "[dom0] Waiting %i seconds for domU boot..." % TEST_DOMU_BOOT_DELAY
155    time.sleep(TEST_DOMU_BOOT_DELAY)
156
157def timeStamp():
158    name = getTestName()
159
160    t = time.asctime(time.localtime())
161
162    print "*** Test %s started at %s %s" % (name, t,
163                                            time.tzname[time.daylight])
164
165#
166# Try to start a domain and attach a console to it to see if
167# the console system is working
168#
169def isConsoleDead():
170
171    from XmTestLib import XmTestDomain, DomainError, XmConsole, ConsoleError
172
173    domain = XmTestDomain()
174
175    try:
176        console = domain.start()
177        console.runCmd("ls")
178    except DomainError, e:
179        return True
180    except ConsoleError, e:
181        domain.destroy()
182        return True
183
184    domain.destroy()
185
186    return False
187
188#
189# We currently can only load as many concurrent HVM domains as loop
190# devices, need to find how many devices the system has.
191def getMaxHVMDomains():
192    nodes = glob.glob("/dev/loop*")
193    maxd = len(nodes)
194
195    return maxd
196
197
198if __name__ == "__main__":
199
200    timeStamp()
201
202    FAIL("foo")
203
Note: See TracBrowser for help on using the repository browser.