/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

async.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 '''
00003 Provide tools for running code asynchronously.
00004 '''
00005 
00006 def async_caller(obj, inbox, outbox):
00007     '''
00008 
00009     Accept messages, convert to method calls on given object, pass
00010     return value as message.
00011 
00012     '''
00013     while True:
00014         msg = inbox.get(block=True)
00015         methname, methnumber, args, kwds = msg
00016         if methname == 'async_caller_exit':
00017             break
00018         meth = eval("obj.%s"%methname)
00019         ret = meth(*args, **kwds)
00020         outbox.put((methnumber, ret))
00021         continue
00022     return
00023 
00024 class AsyncMethod(object):
00025     '''
00026 
00027     A callable that stands in for a method.
00028 
00029     '''
00030 
00031     def __init__(self, name, number, outbox):
00032         self.name = name
00033         self.number = number
00034         self.outbox = outbox
00035         self.callback = None
00036         #print 'AsyncMethod "%s" stacked'%name
00037         return
00038 
00039     def __call__(self, callback, *args, **kwds):
00040         if self.callback:
00041             msg = 'This instance of the %s method has already been caled'
00042             raise AttributeError, msg % self.name
00043         self.callback = callback
00044         self.outbox.put((self.name, self.number, args, kwds))
00045         return
00046 
00047 
00048 class AsyncInterface(object):
00049     '''
00050 
00051     Asynchronous interface to some other object (called "real" below).
00052 
00053     Any method calls on this object, other than the ones explicitly
00054     defined, must have as their first argument a callable object
00055     ("callback").  The method name and remaining arguments will be
00056     queued to another object running in a different process where it
00057     will be interpted and result in calling the method on the real
00058     object.  The result will be queued back to this interface object
00059     where the callback will be called with the result as its argument.
00060 
00061     '''
00062 
00063     def __init__(self, obj, proctype=None):
00064         if not obj:
00065             raise ValueError,'Must give a valid object'
00066         self.__dict__['_obj'] = obj
00067         self.__dict__['_count'] = 0
00068         self.__dict__['_callbacks'] = {}
00069 
00070         if proctype:
00071             meth = eval ("self.do_%s"%proctype)
00072             meth()
00073             return
00074         else:
00075             try:
00076                 import multiprocessing
00077             except ImportError:
00078                 self.do_threaded()
00079             else:
00080                 self.do_multiprocessing()
00081                 pass
00082             pass
00083         return
00084 
00085     def do_multiprocessing(self):
00086         from multiprocessing import Queue, Process
00087         self.__dict__['_proctype'] = 'multiprocessing'
00088         self.__dict__['_outbox'] = outbox = Queue()
00089         self.__dict__['_inbox']  = inbox  = Queue()
00090         self.__dict__['_proc'] = proc = Process(target=async_caller, 
00091                                                 args=(self._obj, outbox, inbox))
00092         proc.start()
00093         return
00094 
00095     def do_threaded(self):
00096         from Queue import Queue
00097         from threading import Thread
00098         self.__dict__['_proctype'] = 'threading'
00099         self.__dict__['_outbox'] = outbox = Queue()
00100         self.__dict__['_inbox']  = inbox  = Queue()
00101         self.__dict__['_proc'] = proc = Thread(target=async_caller, 
00102                                                args=(self._obj, outbox, inbox))
00103         proc.start()
00104         return
00105 
00106     def __getattr__(self, name):
00107         if not self._obj:
00108             raise AttributeError,'No object'
00109         if not hasattr(self._obj, name):
00110             msg = 'No such attribute: "%s" in "%s"'
00111             raise AttributeError, msg % (name, self._obj)
00112         self._count += 1
00113         meth = AsyncMethod(name, self._count, self._outbox)
00114         self._callbacks[self._count] = meth
00115         return meth
00116 
00117     def __del__(self):
00118         self.shutdown()
00119         return
00120 
00121     def shutdown(self):
00122         'Gently shutdown the real object and drain the queue'
00123         self._outbox.put(('async_caller_exit', None, None, None))
00124         self.drain_queue()
00125         self._proc.join()
00126         return
00127 
00128     def abort(self):
00129         'Immediately shutdown'
00130         if self._proctype == 'multiproces':
00131             self._proc.terminate()
00132         import sys
00133         sys.exit(0)
00134         return
00135 
00136     def drain_queue(self, depth = None):
00137         'Drain queue of at most depth pending results.  All if depth=None'
00138         while depth is None or depth > 0:
00139             if depth is not None: depth -= 1
00140             if self._inbox.empty(): 
00141                 #print 'queue empty'
00142                 return
00143             cbnum, res = self._inbox.get()
00144             meth = self._callbacks[cbnum]
00145             #print 'calling',meth.callback,'number',cbnum,'with',res
00146             meth.callback(res)
00147             del self._callbacks[cbnum]
00148             continue
00149         return
00150 
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 09:56:15 for EventLooper by doxygen 1.7.4