source: trunk/packages/sipb-xen-base/files/usr/share/python-support/sipb-xen-base/invirt/common.py @ 793

Last change on this file since 793 was 793, checked in by y_z, 16 years ago

added shared/exclusive locking; added shared locking of initial JSON cache read

File size: 2.3 KB
Line 
1import unittest
2from fcntl import flock, LOCK_EX, LOCK_UN
3
4class struct(object):
5    'A simple namespace object.'
6    def __init__(self, d = {}, **kwargs):
7        'd is the dictionary to update my __dict__ with.'
8        self.__dict__.update(d)
9        self.__dict__.update(kwargs)
10
11def dicts2struct(x):
12    """
13    Given a tree of lists/dicts, perform a deep traversal to transform all the
14    dicts to structs.
15    """
16    if type(x) == dict:
17        return struct((k, dicts2struct(v)) for k,v in x.iteritems())
18    elif type(x) == list:
19        return [dicts2struct(v) for v in x]
20    else:
21        return x
22
23#
24# Hacks to work around lack of Python 2.5's `with` statement.
25#
26
27def with_closing(rsrc):
28    """
29    Utility to emulate Python 2.5's `with closing(rsrc)` context manager.
30
31    E.g.,
32    @with_closing(file('/tmp/foo'))
33    def contents(f):
34        return f.read()
35    # now 'contents' is the contents of /tmp/foo
36    """
37    def wrapper(func):
38        try: return func(rsrc)
39        finally: rsrc.close()
40    return wrapper
41
42def with_lock_file(path, exclusive = True):
43    """
44    Context manager for lock files.  Example:
45
46    @with_lock_file('/tmp/mylock')
47    def input():
48        print 'locked'
49        return raw_input()
50    # prints 'locked'
51    print input # prints what raw_input() returned
52    """
53    def wrapper(func):
54        @with_closing(file(path, 'w'))
55        def g(f):
56            if exclusive: locktype = LOCK_EX
57            else:         locktype = LOCK_SH
58            flock(f, locktype)
59            try: return func()
60            finally: flock(f, LOCK_UN)
61        return g
62    return wrapper
63
64#
65# Tests.
66#
67
68class common_tests(unittest.TestCase):
69    def test_dicts2structs(self):
70        dicts = {
71                'atom': 0,
72                'dict': { 'atom': 'atom', 'list': [1,2,3] },
73                'list': [ 'atom', {'key': 'value'} ]
74                }
75        structs = dicts2struct(dicts)
76        self.assertEqual(structs.atom,        dicts['atom'])
77        self.assertEqual(structs.dict.atom,   dicts['dict']['atom'])
78        self.assertEqual(structs.dict.list,   dicts['dict']['list'])
79        self.assertEqual(structs.list[0],     dicts['list'][0])
80        self.assertEqual(structs.list[1].key, dicts['list'][1]['key'])
81
82if __name__ == '__main__':
83    unittest.main()
Note: See TracBrowser for help on using the repository browser.