/search.css" rel="stylesheet" type="text/css"/> /search.js">
00001 #!/usr/bin/env python 00002 ''' 00003 Provide tools for running code asynchronously. 00004 ''' 00005 00006 def async_caller(obj, inbox, outbox): 00007 ''' 00008 00009 Accept messages, convert to method calls on given object, pass 00010 return value as message. 00011 00012 ''' 00013 while True: 00014 msg = inbox.get(block=True) 00015 methname, methnumber, args, kwds = msg 00016 if methname == 'async_caller_exit': 00017 break 00018 meth = eval("obj.%s"%methname) 00019 ret = meth(*args, **kwds) 00020 outbox.put((methnumber, ret)) 00021 continue 00022 return 00023 00024 class AsyncMethod(object): 00025 ''' 00026 00027 A callable that stands in for a method. 00028 00029 ''' 00030 00031 def __init__(self, name, number, outbox): 00032 self.name = name 00033 self.number = number 00034 self.outbox = outbox 00035 self.callback = None 00036 #print 'AsyncMethod "%s" stacked'%name 00037 return 00038 00039 def __call__(self, callback, *args, **kwds): 00040 if self.callback: 00041 msg = 'This instance of the %s method has already been caled' 00042 raise AttributeError, msg % self.name 00043 self.callback = callback 00044 self.outbox.put((self.name, self.number, args, kwds)) 00045 return 00046 00047 00048 class AsyncInterface(object): 00049 ''' 00050 00051 Asynchronous interface to some other object (called "real" below). 00052 00053 Any method calls on this object, other than the ones explicitly 00054 defined, must have as their first argument a callable object 00055 ("callback"). The method name and remaining arguments will be 00056 queued to another object running in a different process where it 00057 will be interpted and result in calling the method on the real 00058 object. The result will be queued back to this interface object 00059 where the callback will be called with the result as its argument. 00060 00061 ''' 00062 00063 def __init__(self, obj, proctype=None): 00064 if not obj: 00065 raise ValueError,'Must give a valid object' 00066 self.__dict__['_obj'] = obj 00067 self.__dict__['_count'] = 0 00068 self.__dict__['_callbacks'] = {} 00069 00070 if proctype: 00071 meth = eval ("self.do_%s"%proctype) 00072 meth() 00073 return 00074 else: 00075 try: 00076 import multiprocessing 00077 except ImportError: 00078 self.do_threaded() 00079 else: 00080 self.do_multiprocessing() 00081 pass 00082 pass 00083 return 00084 00085 def do_multiprocessing(self): 00086 from multiprocessing import Queue, Process 00087 self.__dict__['_proctype'] = 'multiprocessing' 00088 self.__dict__['_outbox'] = outbox = Queue() 00089 self.__dict__['_inbox'] = inbox = Queue() 00090 self.__dict__['_proc'] = proc = Process(target=async_caller, 00091 args=(self._obj, outbox, inbox)) 00092 proc.start() 00093 return 00094 00095 def do_threaded(self): 00096 from Queue import Queue 00097 from threading import Thread 00098 self.__dict__['_proctype'] = 'threading' 00099 self.__dict__['_outbox'] = outbox = Queue() 00100 self.__dict__['_inbox'] = inbox = Queue() 00101 self.__dict__['_proc'] = proc = Thread(target=async_caller, 00102 args=(self._obj, outbox, inbox)) 00103 proc.start() 00104 return 00105 00106 def __getattr__(self, name): 00107 if not self._obj: 00108 raise AttributeError,'No object' 00109 if not hasattr(self._obj, name): 00110 msg = 'No such attribute: "%s" in "%s"' 00111 raise AttributeError, msg % (name, self._obj) 00112 self._count += 1 00113 meth = AsyncMethod(name, self._count, self._outbox) 00114 self._callbacks[self._count] = meth 00115 return meth 00116 00117 def __del__(self): 00118 self.shutdown() 00119 return 00120 00121 def shutdown(self): 00122 'Gently shutdown the real object and drain the queue' 00123 self._outbox.put(('async_caller_exit', None, None, None)) 00124 self.drain_queue() 00125 self._proc.join() 00126 return 00127 00128 def abort(self): 00129 'Immediately shutdown' 00130 if self._proctype == 'multiproces': 00131 self._proc.terminate() 00132 import sys 00133 sys.exit(0) 00134 return 00135 00136 def drain_queue(self, depth = None): 00137 'Drain queue of at most depth pending results. All if depth=None' 00138 while depth is None or depth > 0: 00139 if depth is not None: depth -= 1 00140 if self._inbox.empty(): 00141 #print 'queue empty' 00142 return 00143 cbnum, res = self._inbox.get() 00144 meth = self._callbacks[cbnum] 00145 #print 'calling',meth.callback,'number',cbnum,'with',res 00146 meth.callback(res) 00147 del self._callbacks[cbnum] 00148 continue 00149 return 00150