/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

averager.py
Go to the documentation of this file.
00001 import logging, time, os, signal
00002 log = logging.getLogger(__name__)
00003 from datetime import timedelta
00004 from pprint import pformat
00005 from decimal import Decimal
00006 from propagator import Propagator
00007 from sourcevector import SourceVector
00008 from territory import Territory
00009 
00010 class Averager(Propagator):
00011     """
00012     Base class holding common averager features, such as the scrape logic
00013     which assumes:
00014 
00015     #. source instances correspond to fixed time measurement *snapshots*
00016     #. target instances represent source measurements over time ranges 
00017     #. 2 source instances are required to form one target instance, the target validity is derived from the datetimes of two source instances
00018 
00019     Initialisation in `Propagator` superclass
00020     """ 
00021     def changed(self, sv ):
00022         """
00023         Override in subclasses to return if a significant 
00024         change in source instances is observed.  This together
00025         with age checks is used to decide is the propagate method is called.
00026 
00027         :param sv:  source vector containing two source instances to interrogate for changes
00028         """
00029         return True
00030 
00031     def propagate(self, sv ):
00032         """
00033         Override this method in subclasses to yield one or more 
00034         write ready target dicts derived from the `sv[-1]` source instance or `sv[-1].aggd` aggregate dict 
00035 
00036         :param sv:  source vector containing two source instances to propagate to one target write  
00037         """
00038         yield {}
00039 
00040     def __call__(self):
00041         """
00042         Note the differences between averager and scraper:
00043 
00044         #. single pass rather than infinite loop is typically used
00045 
00046         #. `group_by` aggregation is used causing potentially many aggregated dicts to be propagated
00047 
00048         #. in an averager the writer must be instanciated within the propagate loop as 
00049            each aggregate dict corresponds to a different time range.
00050 
00051         #. time bins of typically 1 day are externally applied, via setting tcursor to a 2 element list  
00052 
00053         """
00054         i,w = 0,0
00055         while i<self.maxiter or self.maxiter==0:
00056             i += 1
00057             log.debug("i %s " % i )
00058             for sv in self:
00059                 log.info("sv.source.xtn %r", sv.source.xtn ) 
00060                 floor = sv.tcursor     ## tcursor is initialized based on target last validity endtime
00061 
00062                 ## assumes single shot running
00063                 if not getattr(sv,'territory',None) and self.aggregate_group_by_territory:
00064                     sv.territory = Territory( sv.source , by=self.aggregate_group_by_territory , suby=self.aggregate_group_by, att=self.aggregate_group_by_att , floor=floor  )
00065                     sv.territory.dump()
00066                     if len(sv.territory) == 0:
00067                         log.warn("no bins : insufficient time range for period\n%s\n" % sv.territory )
00068                     else:
00069                         log.info("looping over %s territory bins performing grouped aggregate queries in each bin  " % len(sv.territory) )
00070 
00071                 ## this assumes independence of sources, maybe need to align territorial time ranges 
00072                 ## so clients have option of mangling them together 
00073                 nbin = len(sv.territory)
00074                 for ibin,bin in enumerate(sv.territory): 
00075                     sv.tcursor = bin
00076                     log.debug( "averager ibin %s bin %s " % ( ibin, map(lambda _:_.ctime(),bin)) )
00077                     proceed = sv()  
00078                     if not proceed:
00079                         continue 
00080                     writes = []
00081                     for td in self.propagate( sv ):           ## iterate over "yield" supplier of target dicts   
00082 
00083                         localstart = td.get( "_%s_min" % self.aggregate_group_by_att , None)  # typically date_time
00084                         localend   = td.get( "_%s_max" % self.aggregate_group_by_att , None )
00085                         log.debug( "averager within propagate loop writer for range %s %s " % ( localstart, localend ))
00086                         wrt = self.target.writer( sv , localstart=localstart, localend=localend ) 
00087 
00088                         tdi = self.target.instance(**td)   
00089                         log.debug("tdi %r " % tdi )
00090                         wrt.Write(tdi)    
00091 
00092                         seqno = wrt.Close() ## hits target DB here
00093                         if seqno:
00094                             log.debug("write %s succeeded %s seqno %s  " % ( w, sv, seqno ) )
00095                             writes.append(dict(seqno=seqno,timestart=wrt.ctx.contextrange.timestart,timeend=wrt.ctx.contextrange.timeend,localstart=localstart,localend=localend))
00096                             w += 1
00097                         else:
00098                             log.fatal("writing failed %s " %  sv )
00099                             assert 0
00100                     pass
00101                     if len(writes)>0:
00102                         starts = map(lambda _:_['localstart'], writes)    
00103                         ends   = map(lambda _:_['localend'], writes)
00104                         seqnos = map(lambda _:_['seqno'], writes)
00105                         log.debug("propagated %s with seqnos %s:%s time range %s %s  " % ( len(writes), min(seqnos),max(seqnos), min(starts), max(ends) ))
00106                     else:
00107                         log.debug("no writes for bin %s %s " % ( ibin, map(lambda _:_.ctime(),bin)))
00108                     pass 
00109                     # end bin
00110                 pass
00111                 # end sv  
00112             pass 
00113             # end while
00114             self.handle_signal()
00115         pass
00116 
00117 if __name__ == '__main__':
00118     pass
00119 
00120 
00121 
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 09:50:03 for Scraper by doxygen 1.7.4