[34] | 1 | #=========================================================================== |
---|
| 2 | # This library is free software; you can redistribute it and/or |
---|
| 3 | # modify it under the terms of version 2.1 of the GNU Lesser General Public |
---|
| 4 | # License as published by the Free Software Foundation. |
---|
| 5 | # |
---|
| 6 | # This library is distributed in the hope that it will be useful, |
---|
| 7 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
---|
| 8 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
---|
| 9 | # Lesser General Public License for more details. |
---|
| 10 | # |
---|
| 11 | # You should have received a copy of the GNU Lesser General Public |
---|
| 12 | # License along with this library; if not, write to the Free Software |
---|
| 13 | # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
---|
| 14 | #============================================================================ |
---|
| 15 | # Copyright (C) 2007 XenSource Ltd |
---|
| 16 | #============================================================================ |
---|
| 17 | |
---|
| 18 | from xen.xend.XendAPIConstants import XEN_API_TASK_STATUS_TYPE |
---|
| 19 | from xen.xend.XendLogging import log |
---|
| 20 | import thread |
---|
| 21 | import threading |
---|
| 22 | |
---|
| 23 | class XendTask(threading.Thread): |
---|
| 24 | """Represents a Asynchronous Task used by Xen API. |
---|
| 25 | |
---|
| 26 | Basically proxies the callable object in a thread and returns the |
---|
| 27 | results via self.{type,result,error_info}. |
---|
| 28 | |
---|
| 29 | @cvar task_progress: Thread local storage for progress tracking. |
---|
| 30 | It is a dict indexed by thread_id. Note that the |
---|
| 31 | thread_id may be reused when the previous |
---|
| 32 | thread with the thread_id ends. |
---|
| 33 | |
---|
| 34 | @cvar task_progress_lock: lock on thread access to task_progress |
---|
| 35 | |
---|
| 36 | """ |
---|
| 37 | |
---|
| 38 | # progress stack: |
---|
| 39 | # thread_id : [(start_task, end_task), |
---|
| 40 | # (start_sub_task, end_sub_task)..] |
---|
| 41 | # example : (0, 100), (50, 100) (50, 100) ... |
---|
| 42 | # That would mean that the task is 75% complete. |
---|
| 43 | # as it is 50% of the last 50% of the task. |
---|
| 44 | |
---|
| 45 | task_progress = {} |
---|
| 46 | task_progress_lock = threading.Lock() |
---|
| 47 | |
---|
| 48 | def __init__(self, uuid, func, args, func_name, return_type, label, desc, |
---|
| 49 | session): |
---|
| 50 | """ |
---|
| 51 | @param uuid: UUID of the task |
---|
| 52 | @type uuid: string |
---|
| 53 | @param func: Method to call (from XendAPI) |
---|
| 54 | @type func: callable object |
---|
| 55 | @param args: arguments to pass to function |
---|
| 56 | @type args: list or tuple |
---|
| 57 | @param label: name label of the task. |
---|
| 58 | @type label: string |
---|
| 59 | @param desc: name description of the task. |
---|
| 60 | @type desc: string |
---|
| 61 | @param func_name: function name, eg ('VM.start') |
---|
| 62 | @type desc: string |
---|
| 63 | """ |
---|
| 64 | |
---|
| 65 | threading.Thread.__init__(self) |
---|
| 66 | self.status_lock = threading.Lock() |
---|
| 67 | self.status = XEN_API_TASK_STATUS_TYPE[0] |
---|
| 68 | |
---|
| 69 | self.progress = 0 |
---|
| 70 | self.type = return_type |
---|
| 71 | self.uuid = uuid |
---|
| 72 | |
---|
| 73 | self.result = None |
---|
| 74 | self.error_info = [] |
---|
| 75 | |
---|
| 76 | self.name_label = label or func.__name__ |
---|
| 77 | self.name_description = desc |
---|
| 78 | self.thread_id = 0 |
---|
| 79 | |
---|
| 80 | self.func_name = func_name |
---|
| 81 | self.func = func |
---|
| 82 | self.args = args |
---|
| 83 | |
---|
| 84 | self.session = session |
---|
| 85 | |
---|
| 86 | def set_status(self, new_status): |
---|
| 87 | self.status_lock.acquire() |
---|
| 88 | try: |
---|
| 89 | self.status = new_status |
---|
| 90 | finally: |
---|
| 91 | self.status_lock.release() |
---|
| 92 | |
---|
| 93 | def get_status(self): |
---|
| 94 | self.status_lock.acquire() |
---|
| 95 | try: |
---|
| 96 | return self.status |
---|
| 97 | finally: |
---|
| 98 | self.status_lock.release() |
---|
| 99 | |
---|
| 100 | def run(self): |
---|
| 101 | """Runs the method and stores the result for later access. |
---|
| 102 | |
---|
| 103 | Is invoked by threading.Thread.start(). |
---|
| 104 | """ |
---|
| 105 | |
---|
| 106 | self.thread_id = thread.get_ident() |
---|
| 107 | self.task_progress_lock.acquire() |
---|
| 108 | try: |
---|
| 109 | self.task_progress[self.thread_id] = {} |
---|
| 110 | self.progress = 0 |
---|
| 111 | finally: |
---|
| 112 | self.task_progress_lock.release() |
---|
| 113 | |
---|
| 114 | try: |
---|
| 115 | result = self.func(*self.args) |
---|
| 116 | if result['Status'] == 'Success': |
---|
| 117 | self.result = result['Value'] |
---|
| 118 | self.set_status(XEN_API_TASK_STATUS_TYPE[1]) |
---|
| 119 | else: |
---|
| 120 | self.error_info = result['ErrorDescription'] |
---|
| 121 | self.set_status(XEN_API_TASK_STATUS_TYPE[2]) |
---|
| 122 | except Exception, e: |
---|
| 123 | log.exception('Error running Async Task') |
---|
| 124 | self.error_info = ['INTERNAL ERROR', str(e)] |
---|
| 125 | self.set_status(XEN_API_TASK_STATUS_TYPE[2]) |
---|
| 126 | |
---|
| 127 | self.task_progress_lock.acquire() |
---|
| 128 | try: |
---|
| 129 | del self.task_progress[self.thread_id] |
---|
| 130 | self.progress = 100 |
---|
| 131 | finally: |
---|
| 132 | self.task_progress_lock.release() |
---|
| 133 | |
---|
| 134 | def get_record(self): |
---|
| 135 | """Returns a Xen API compatible record.""" |
---|
| 136 | return { |
---|
| 137 | 'uuid': self.uuid, |
---|
| 138 | 'name_label': self.name_label, |
---|
| 139 | 'name_description': self.name_description, |
---|
| 140 | 'status': self.status, |
---|
| 141 | 'progress': self.get_progress(), |
---|
| 142 | 'type': self.type, |
---|
| 143 | 'result': self.result, |
---|
| 144 | 'error_info': self.error_info, |
---|
| 145 | 'allowed_operations': {}, |
---|
| 146 | 'session': self.session, |
---|
| 147 | } |
---|
| 148 | |
---|
| 149 | def get_progress(self): |
---|
| 150 | """ Checks the thread local progress storage. """ |
---|
| 151 | if self.status != XEN_API_TASK_STATUS_TYPE[0]: |
---|
| 152 | return 100 |
---|
| 153 | |
---|
| 154 | self.task_progress_lock.acquire() |
---|
| 155 | try: |
---|
| 156 | # Pop each progress range in the stack and map it on to |
---|
| 157 | # the next progress range until we find out cumulative |
---|
| 158 | # progress based on the (start, end) range of each level |
---|
| 159 | start = 0 |
---|
| 160 | prog_stack = self.task_progress.get(self.thread_id, [])[:] |
---|
| 161 | if len(prog_stack) > 0: |
---|
| 162 | start, stop = prog_stack.pop() |
---|
| 163 | while prog_stack: |
---|
| 164 | new_start, new_stop = prog_stack.pop() |
---|
| 165 | start = new_start + ((new_stop - new_start)/100.0 * start) |
---|
| 166 | |
---|
| 167 | # only update progress if it increases, this will prevent |
---|
| 168 | # progress from going backwards when tasks are popped off |
---|
| 169 | # the stack |
---|
| 170 | if start > self.progress: |
---|
| 171 | self.progress = int(start) |
---|
| 172 | finally: |
---|
| 173 | self.task_progress_lock.release() |
---|
| 174 | |
---|
| 175 | return self.progress |
---|
| 176 | |
---|
| 177 | |
---|
| 178 | def log_progress(cls, progress_min, progress_max, |
---|
| 179 | func, *args, **kwds): |
---|
| 180 | """ Callable function wrapper that logs the progress of the |
---|
| 181 | function to thread local storage for task progress calculation. |
---|
| 182 | |
---|
| 183 | This is a class method so other parts of Xend will update |
---|
| 184 | the task progress by calling: |
---|
| 185 | |
---|
| 186 | XendTask.push_progress(progress_min, progress_max, |
---|
| 187 | func, *args, **kwds) |
---|
| 188 | |
---|
| 189 | The results of the progress is stored in thread local storage |
---|
| 190 | and the result of the func(*args, **kwds) is returned back |
---|
| 191 | to the caller. |
---|
| 192 | |
---|
| 193 | """ |
---|
| 194 | thread_id = thread.get_ident() |
---|
| 195 | retval = None |
---|
| 196 | |
---|
| 197 | # Log the start of the method |
---|
| 198 | cls.task_progress_lock.acquire() |
---|
| 199 | try: |
---|
| 200 | if type(cls.task_progress.get(thread_id)) != list: |
---|
| 201 | cls.task_progress[thread_id] = [] |
---|
| 202 | |
---|
| 203 | cls.task_progress[thread_id].append((progress_min, |
---|
| 204 | progress_max)) |
---|
| 205 | finally: |
---|
| 206 | cls.task_progress_lock.release() |
---|
| 207 | |
---|
| 208 | # Execute the method |
---|
| 209 | retval = func(*args, **kwds) |
---|
| 210 | |
---|
| 211 | # Log the end of the method by popping the progress range |
---|
| 212 | # off the stack. |
---|
| 213 | cls.task_progress_lock.acquire() |
---|
| 214 | try: |
---|
| 215 | cls.task_progress[thread_id].pop() |
---|
| 216 | finally: |
---|
| 217 | cls.task_progress_lock.release() |
---|
| 218 | |
---|
| 219 | return retval |
---|
| 220 | |
---|
| 221 | log_progress = classmethod(log_progress) |
---|
| 222 | |
---|
| 223 | |
---|
| 224 | |
---|