source: trunk/packages/sipb-xen-base/files/usr/share/python-support/sipb-xen-base/invirt/common.py @ 781

Last change on this file since 781 was 781, checked in by y_z, 16 years ago
  • added file locking around cache
  • cleanup
File size: 2.0 KB
Line 
1import unittest
2from fcntl import flock, LOCK_EX, LOCK_UN
3from os import remove
4
5class struct(object):
6    'A simple namespace object.'
7    def __init__(self, d = {}, **kwargs):
8        'd is the dictionary to update my __dict__ with.'
9        self.__dict__.update(d)
10        self.__dict__.update(kwargs)
11
12def dicts2struct(x):
13    """
14    Given a tree of lists/dicts, perform a deep traversal to transform all the
15    dicts to structs.
16    """
17    if type(x) == dict:
18        return struct((k, dicts2struct(v)) for k,v in x.iteritems())
19    elif type(x) == list:
20        return [dicts2struct(v) for v in x]
21    else:
22        return x
23
24#
25# Hacks to work around lack of Python 2.5's `with` statement.
26#
27
28def with_closing(rsrc):
29    "Utility to emulate Python 2.5's `with closing(rsrc)` context manager."
30    def wrapper(func):
31        try: return func(rsrc)
32        finally: rsrc.close()
33    return wrapper
34
35def with_lock_file(path):
36    """
37    Context manager for lock files.  Example:
38
39    @with_lock_file('/tmp/mylock')
40    def input():
41        print 'locked'
42        return raw_input()
43    # decorator is executed immediately
44    print input # prints the input text
45    """
46    def wrapper(func):
47        @with_closing(file(path, 'w'))
48        def g(f):
49            flock(f, LOCK_EX)
50            try: return func()
51            finally: flock(f, LOCK_UN)
52        remove(path)
53        return g
54    return wrapper
55
56#
57# Tests.
58#
59
60class common_tests(unittest.TestCase):
61    def test_dicts2structs(self):
62        dicts = {
63                'atom': 0,
64                'dict': { 'atom': 'atom', 'list': [1,2,3] },
65                'list': [ 'atom', {'key': 'value'} ]
66                }
67        structs = dicts2struct(dicts)
68        self.assertEqual(structs.atom,        dicts['atom'])
69        self.assertEqual(structs.dict.atom,   dicts['dict']['atom'])
70        self.assertEqual(structs.dict.list,   dicts['dict']['list'])
71        self.assertEqual(structs.list[0],     dicts['list'][0])
72        self.assertEqual(structs.list[1].key, dicts['list'][1]['key'])
73
74if __name__ == '__main__':
75    unittest.main()
Note: See TracBrowser for help on using the repository browser.