source: trunk/packages/xen-3.1/xen-3.1/tools/python/xen/xend/XendTask.py @ 34

Last change on this file since 34 was 34, checked in by hartmans, 17 years ago

Add xen and xen-common

File size: 7.6 KB
Line 
1#===========================================================================
2# This library is free software; you can redistribute it and/or
3# modify it under the terms of version 2.1 of the GNU Lesser General Public
4# License as published by the Free Software Foundation.
5#
6# This library is distributed in the hope that it will be useful,
7# but WITHOUT ANY WARRANTY; without even the implied warranty of
8# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
9# Lesser General Public License for more details.
10#
11# You should have received a copy of the GNU Lesser General Public
12# License along with this library; if not, write to the Free Software
13# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
14#============================================================================
15# Copyright (C) 2007 XenSource Ltd
16#============================================================================
17
18from xen.xend.XendAPIConstants import XEN_API_TASK_STATUS_TYPE
19from xen.xend.XendLogging import log
20import thread
21import threading
22
23class XendTask(threading.Thread):
24    """Represents a Asynchronous Task used by Xen API.
25
26    Basically proxies the callable object in a thread and returns the
27    results via self.{type,result,error_info}.
28
29    @cvar task_progress: Thread local storage for progress tracking.
30                         It is a dict indexed by thread_id. Note that the
31                         thread_id may be reused when the previous
32                         thread with the thread_id ends.
33                         
34    @cvar task_progress_lock: lock on thread access to task_progress
35   
36    """
37
38    # progress stack:
39    # thread_id : [(start_task, end_task),
40    #              (start_sub_task, end_sub_task)..]
41    # example : (0, 100), (50, 100) (50, 100) ...
42    #           That would mean that the task is 75% complete.
43    #           as it is 50% of the last 50% of the task.
44   
45    task_progress = {}
46    task_progress_lock = threading.Lock()
47
48    def __init__(self, uuid, func, args, func_name, return_type, label, desc,
49                 session):
50        """
51        @param uuid: UUID of the task
52        @type uuid: string
53        @param func: Method to call (from XendAPI)
54        @type func: callable object
55        @param args: arguments to pass to function
56        @type args: list or tuple
57        @param label: name label of the task.
58        @type label: string
59        @param desc: name description of the task.
60        @type desc: string
61        @param func_name: function name, eg ('VM.start')
62        @type desc: string
63        """
64       
65        threading.Thread.__init__(self)
66        self.status_lock = threading.Lock()
67        self.status = XEN_API_TASK_STATUS_TYPE[0]
68
69        self.progress = 0
70        self.type = return_type
71        self.uuid = uuid
72       
73        self.result = None
74        self.error_info = []
75       
76        self.name_label = label or func.__name__
77        self.name_description = desc
78        self.thread_id = 0
79
80        self.func_name = func_name
81        self.func = func
82        self.args = args
83
84        self.session = session
85
86    def set_status(self, new_status):
87        self.status_lock.acquire()
88        try:
89            self.status = new_status
90        finally:
91            self.status_lock.release()
92
93    def get_status(self):
94        self.status_lock.acquire()
95        try:
96            return self.status
97        finally:
98            self.status_lock.release()       
99
100    def run(self):
101        """Runs the method and stores the result for later access.
102
103        Is invoked by threading.Thread.start().
104        """
105
106        self.thread_id = thread.get_ident()
107        self.task_progress_lock.acquire()
108        try:
109            self.task_progress[self.thread_id] = {}
110            self.progress = 0           
111        finally:
112            self.task_progress_lock.release()
113
114        try:
115            result = self.func(*self.args)
116            if result['Status'] == 'Success':
117                self.result = result['Value']
118                self.set_status(XEN_API_TASK_STATUS_TYPE[1])
119            else:
120                self.error_info = result['ErrorDescription']
121                self.set_status(XEN_API_TASK_STATUS_TYPE[2])               
122        except Exception, e:
123            log.exception('Error running Async Task')
124            self.error_info = ['INTERNAL ERROR', str(e)]
125            self.set_status(XEN_API_TASK_STATUS_TYPE[2])
126
127        self.task_progress_lock.acquire()
128        try:
129            del self.task_progress[self.thread_id]
130            self.progress = 100
131        finally:
132            self.task_progress_lock.release()
133   
134    def get_record(self):
135        """Returns a Xen API compatible record."""
136        return {
137            'uuid': self.uuid,           
138            'name_label': self.name_label,
139            'name_description': self.name_description,
140            'status': self.status,
141            'progress': self.get_progress(),
142            'type': self.type,
143            'result': self.result,
144            'error_info': self.error_info,
145            'allowed_operations': {},
146            'session': self.session,
147        }
148
149    def get_progress(self):
150        """ Checks the thread local progress storage. """
151        if self.status != XEN_API_TASK_STATUS_TYPE[0]:
152            return 100
153       
154        self.task_progress_lock.acquire()
155        try:
156            # Pop each progress range in the stack and map it on to
157            # the next progress range until we find out cumulative
158            # progress based on the (start, end) range of each level
159            start = 0
160            prog_stack = self.task_progress.get(self.thread_id, [])[:]
161            if len(prog_stack) > 0:
162                start, stop = prog_stack.pop()
163                while prog_stack:
164                    new_start, new_stop = prog_stack.pop()
165                    start = new_start + ((new_stop - new_start)/100.0 * start)
166
167            # only update progress if it increases, this will prevent
168            # progress from going backwards when tasks are popped off
169            # the stack
170            if start > self.progress:
171                self.progress = int(start)
172        finally:
173            self.task_progress_lock.release()
174
175        return self.progress
176
177
178    def log_progress(cls, progress_min, progress_max,
179                     func, *args, **kwds):
180        """ Callable function wrapper that logs the progress of the
181        function to thread local storage for task progress calculation.
182
183        This is a class method so other parts of Xend will update
184        the task progress by calling:
185
186        XendTask.push_progress(progress_min, progress_max,
187                               func, *args, **kwds)
188
189        The results of the progress is stored in thread local storage
190        and the result of the func(*args, **kwds) is returned back
191        to the caller.
192
193        """
194        thread_id = thread.get_ident()
195        retval = None
196
197        # Log the start of the method
198        cls.task_progress_lock.acquire()
199        try:
200            if type(cls.task_progress.get(thread_id)) != list:
201                cls.task_progress[thread_id] = []
202                   
203            cls.task_progress[thread_id].append((progress_min,
204                                                 progress_max))
205        finally:
206            cls.task_progress_lock.release()
207
208        # Execute the method
209        retval = func(*args, **kwds)
210
211        # Log the end of the method by popping the progress range
212        # off the stack.
213        cls.task_progress_lock.acquire()
214        try:
215            cls.task_progress[thread_id].pop()
216        finally:
217            cls.task_progress_lock.release()
218
219        return retval
220
221    log_progress = classmethod(log_progress)
222
223   
224
Note: See TracBrowser for help on using the repository browser.