1 | #=========================================================================== |
---|
2 | # This library is free software; you can redistribute it and/or |
---|
3 | # modify it under the terms of version 2.1 of the GNU Lesser General Public |
---|
4 | # License as published by the Free Software Foundation. |
---|
5 | # |
---|
6 | # This library is distributed in the hope that it will be useful, |
---|
7 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
---|
8 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
---|
9 | # Lesser General Public License for more details. |
---|
10 | # |
---|
11 | # You should have received a copy of the GNU Lesser General Public |
---|
12 | # License along with this library; if not, write to the Free Software |
---|
13 | # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
---|
14 | #============================================================================ |
---|
15 | # Copyright (C) 2007 XenSource Ltd |
---|
16 | #============================================================================ |
---|
17 | |
---|
18 | from xen.xend.XendAPIConstants import XEN_API_TASK_STATUS_TYPE |
---|
19 | from xen.xend.XendLogging import log |
---|
20 | import thread |
---|
21 | import threading |
---|
22 | |
---|
23 | class XendTask(threading.Thread): |
---|
24 | """Represents a Asynchronous Task used by Xen API. |
---|
25 | |
---|
26 | Basically proxies the callable object in a thread and returns the |
---|
27 | results via self.{type,result,error_info}. |
---|
28 | |
---|
29 | @cvar task_progress: Thread local storage for progress tracking. |
---|
30 | It is a dict indexed by thread_id. Note that the |
---|
31 | thread_id may be reused when the previous |
---|
32 | thread with the thread_id ends. |
---|
33 | |
---|
34 | @cvar task_progress_lock: lock on thread access to task_progress |
---|
35 | |
---|
36 | """ |
---|
37 | |
---|
38 | # progress stack: |
---|
39 | # thread_id : [(start_task, end_task), |
---|
40 | # (start_sub_task, end_sub_task)..] |
---|
41 | # example : (0, 100), (50, 100) (50, 100) ... |
---|
42 | # That would mean that the task is 75% complete. |
---|
43 | # as it is 50% of the last 50% of the task. |
---|
44 | |
---|
45 | task_progress = {} |
---|
46 | task_progress_lock = threading.Lock() |
---|
47 | |
---|
48 | def __init__(self, uuid, func, args, func_name, return_type, label, desc, |
---|
49 | session): |
---|
50 | """ |
---|
51 | @param uuid: UUID of the task |
---|
52 | @type uuid: string |
---|
53 | @param func: Method to call (from XendAPI) |
---|
54 | @type func: callable object |
---|
55 | @param args: arguments to pass to function |
---|
56 | @type args: list or tuple |
---|
57 | @param label: name label of the task. |
---|
58 | @type label: string |
---|
59 | @param desc: name description of the task. |
---|
60 | @type desc: string |
---|
61 | @param func_name: function name, eg ('VM.start') |
---|
62 | @type desc: string |
---|
63 | """ |
---|
64 | |
---|
65 | threading.Thread.__init__(self) |
---|
66 | self.status_lock = threading.Lock() |
---|
67 | self.status = XEN_API_TASK_STATUS_TYPE[0] |
---|
68 | |
---|
69 | self.progress = 0 |
---|
70 | self.type = return_type |
---|
71 | self.uuid = uuid |
---|
72 | |
---|
73 | self.result = None |
---|
74 | self.error_info = [] |
---|
75 | |
---|
76 | self.name_label = label or func.__name__ |
---|
77 | self.name_description = desc |
---|
78 | self.thread_id = 0 |
---|
79 | |
---|
80 | self.func_name = func_name |
---|
81 | self.func = func |
---|
82 | self.args = args |
---|
83 | |
---|
84 | self.session = session |
---|
85 | |
---|
86 | def set_status(self, new_status): |
---|
87 | self.status_lock.acquire() |
---|
88 | try: |
---|
89 | self.status = new_status |
---|
90 | finally: |
---|
91 | self.status_lock.release() |
---|
92 | |
---|
93 | def get_status(self): |
---|
94 | self.status_lock.acquire() |
---|
95 | try: |
---|
96 | return self.status |
---|
97 | finally: |
---|
98 | self.status_lock.release() |
---|
99 | |
---|
100 | def run(self): |
---|
101 | """Runs the method and stores the result for later access. |
---|
102 | |
---|
103 | Is invoked by threading.Thread.start(). |
---|
104 | """ |
---|
105 | |
---|
106 | self.thread_id = thread.get_ident() |
---|
107 | self.task_progress_lock.acquire() |
---|
108 | try: |
---|
109 | self.task_progress[self.thread_id] = {} |
---|
110 | self.progress = 0 |
---|
111 | finally: |
---|
112 | self.task_progress_lock.release() |
---|
113 | |
---|
114 | try: |
---|
115 | result = self.func(*self.args) |
---|
116 | if result['Status'] == 'Success': |
---|
117 | self.result = result['Value'] |
---|
118 | self.set_status(XEN_API_TASK_STATUS_TYPE[1]) |
---|
119 | else: |
---|
120 | self.error_info = result['ErrorDescription'] |
---|
121 | self.set_status(XEN_API_TASK_STATUS_TYPE[2]) |
---|
122 | except Exception, e: |
---|
123 | log.exception('Error running Async Task') |
---|
124 | self.error_info = ['INTERNAL ERROR', str(e)] |
---|
125 | self.set_status(XEN_API_TASK_STATUS_TYPE[2]) |
---|
126 | |
---|
127 | self.task_progress_lock.acquire() |
---|
128 | try: |
---|
129 | del self.task_progress[self.thread_id] |
---|
130 | self.progress = 100 |
---|
131 | finally: |
---|
132 | self.task_progress_lock.release() |
---|
133 | |
---|
134 | def get_record(self): |
---|
135 | """Returns a Xen API compatible record.""" |
---|
136 | return { |
---|
137 | 'uuid': self.uuid, |
---|
138 | 'name_label': self.name_label, |
---|
139 | 'name_description': self.name_description, |
---|
140 | 'status': self.status, |
---|
141 | 'progress': self.get_progress(), |
---|
142 | 'type': self.type, |
---|
143 | 'result': self.result, |
---|
144 | 'error_info': self.error_info, |
---|
145 | 'allowed_operations': {}, |
---|
146 | 'session': self.session, |
---|
147 | } |
---|
148 | |
---|
149 | def get_progress(self): |
---|
150 | """ Checks the thread local progress storage. """ |
---|
151 | if self.status != XEN_API_TASK_STATUS_TYPE[0]: |
---|
152 | return 100 |
---|
153 | |
---|
154 | self.task_progress_lock.acquire() |
---|
155 | try: |
---|
156 | # Pop each progress range in the stack and map it on to |
---|
157 | # the next progress range until we find out cumulative |
---|
158 | # progress based on the (start, end) range of each level |
---|
159 | start = 0 |
---|
160 | prog_stack = self.task_progress.get(self.thread_id, [])[:] |
---|
161 | if len(prog_stack) > 0: |
---|
162 | start, stop = prog_stack.pop() |
---|
163 | while prog_stack: |
---|
164 | new_start, new_stop = prog_stack.pop() |
---|
165 | start = new_start + ((new_stop - new_start)/100.0 * start) |
---|
166 | |
---|
167 | # only update progress if it increases, this will prevent |
---|
168 | # progress from going backwards when tasks are popped off |
---|
169 | # the stack |
---|
170 | if start > self.progress: |
---|
171 | self.progress = int(start) |
---|
172 | finally: |
---|
173 | self.task_progress_lock.release() |
---|
174 | |
---|
175 | return self.progress |
---|
176 | |
---|
177 | |
---|
178 | def log_progress(cls, progress_min, progress_max, |
---|
179 | func, *args, **kwds): |
---|
180 | """ Callable function wrapper that logs the progress of the |
---|
181 | function to thread local storage for task progress calculation. |
---|
182 | |
---|
183 | This is a class method so other parts of Xend will update |
---|
184 | the task progress by calling: |
---|
185 | |
---|
186 | XendTask.push_progress(progress_min, progress_max, |
---|
187 | func, *args, **kwds) |
---|
188 | |
---|
189 | The results of the progress is stored in thread local storage |
---|
190 | and the result of the func(*args, **kwds) is returned back |
---|
191 | to the caller. |
---|
192 | |
---|
193 | """ |
---|
194 | thread_id = thread.get_ident() |
---|
195 | retval = None |
---|
196 | |
---|
197 | # Log the start of the method |
---|
198 | cls.task_progress_lock.acquire() |
---|
199 | try: |
---|
200 | if type(cls.task_progress.get(thread_id)) != list: |
---|
201 | cls.task_progress[thread_id] = [] |
---|
202 | |
---|
203 | cls.task_progress[thread_id].append((progress_min, |
---|
204 | progress_max)) |
---|
205 | finally: |
---|
206 | cls.task_progress_lock.release() |
---|
207 | |
---|
208 | # Execute the method |
---|
209 | retval = func(*args, **kwds) |
---|
210 | |
---|
211 | # Log the end of the method by popping the progress range |
---|
212 | # off the stack. |
---|
213 | cls.task_progress_lock.acquire() |
---|
214 | try: |
---|
215 | cls.task_progress[thread_id].pop() |
---|
216 | finally: |
---|
217 | cls.task_progress_lock.release() |
---|
218 | |
---|
219 | return retval |
---|
220 | |
---|
221 | log_progress = classmethod(log_progress) |
---|
222 | |
---|
223 | |
---|
224 | |
---|