/search.css" rel="stylesheet" type="text/css"/> /search.js">
00001 import logging, time, os, signal 00002 log = logging.getLogger(__name__) 00003 from datetime import timedelta 00004 from pprint import pformat 00005 from decimal import Decimal 00006 from propagator import Propagator 00007 from sourcevector import SourceVector 00008 from territory import Territory 00009 00010 class Averager(Propagator): 00011 """ 00012 Base class holding common averager features, such as the scrape logic 00013 which assumes: 00014 00015 #. source instances correspond to fixed time measurement *snapshots* 00016 #. target instances represent source measurements over time ranges 00017 #. 2 source instances are required to form one target instance, the target validity is derived from the datetimes of two source instances 00018 00019 Initialisation in `Propagator` superclass 00020 """ 00021 def changed(self, sv ): 00022 """ 00023 Override in subclasses to return if a significant 00024 change in source instances is observed. This together 00025 with age checks is used to decide is the propagate method is called. 00026 00027 :param sv: source vector containing two source instances to interrogate for changes 00028 """ 00029 return True 00030 00031 def propagate(self, sv ): 00032 """ 00033 Override this method in subclasses to yield one or more 00034 write ready target dicts derived from the `sv[-1]` source instance or `sv[-1].aggd` aggregate dict 00035 00036 :param sv: source vector containing two source instances to propagate to one target write 00037 """ 00038 yield {} 00039 00040 def __call__(self): 00041 """ 00042 Note the differences between averager and scraper: 00043 00044 #. single pass rather than infinite loop is typically used 00045 00046 #. `group_by` aggregation is used causing potentially many aggregated dicts to be propagated 00047 00048 #. in an averager the writer must be instanciated within the propagate loop as 00049 each aggregate dict corresponds to a different time range. 00050 00051 #. time bins of typically 1 day are externally applied, via setting tcursor to a 2 element list 00052 00053 """ 00054 i,w = 0,0 00055 while i<self.maxiter or self.maxiter==0: 00056 i += 1 00057 log.debug("i %s " % i ) 00058 for sv in self: 00059 log.info("sv.source.xtn %r", sv.source.xtn ) 00060 floor = sv.tcursor ## tcursor is initialized based on target last validity endtime 00061 00062 ## assumes single shot running 00063 if not getattr(sv,'territory',None) and self.aggregate_group_by_territory: 00064 sv.territory = Territory( sv.source , by=self.aggregate_group_by_territory , suby=self.aggregate_group_by, att=self.aggregate_group_by_att , floor=floor ) 00065 sv.territory.dump() 00066 if len(sv.territory) == 0: 00067 log.warn("no bins : insufficient time range for period\n%s\n" % sv.territory ) 00068 else: 00069 log.info("looping over %s territory bins performing grouped aggregate queries in each bin " % len(sv.territory) ) 00070 00071 ## this assumes independence of sources, maybe need to align territorial time ranges 00072 ## so clients have option of mangling them together 00073 nbin = len(sv.territory) 00074 for ibin,bin in enumerate(sv.territory): 00075 sv.tcursor = bin 00076 log.debug( "averager ibin %s bin %s " % ( ibin, map(lambda _:_.ctime(),bin)) ) 00077 proceed = sv() 00078 if not proceed: 00079 continue 00080 writes = [] 00081 for td in self.propagate( sv ): ## iterate over "yield" supplier of target dicts 00082 00083 localstart = td.get( "_%s_min" % self.aggregate_group_by_att , None) # typically date_time 00084 localend = td.get( "_%s_max" % self.aggregate_group_by_att , None ) 00085 log.debug( "averager within propagate loop writer for range %s %s " % ( localstart, localend )) 00086 wrt = self.target.writer( sv , localstart=localstart, localend=localend ) 00087 00088 tdi = self.target.instance(**td) 00089 log.debug("tdi %r " % tdi ) 00090 wrt.Write(tdi) 00091 00092 seqno = wrt.Close() ## hits target DB here 00093 if seqno: 00094 log.debug("write %s succeeded %s seqno %s " % ( w, sv, seqno ) ) 00095 writes.append(dict(seqno=seqno,timestart=wrt.ctx.contextrange.timestart,timeend=wrt.ctx.contextrange.timeend,localstart=localstart,localend=localend)) 00096 w += 1 00097 else: 00098 log.fatal("writing failed %s " % sv ) 00099 assert 0 00100 pass 00101 if len(writes)>0: 00102 starts = map(lambda _:_['localstart'], writes) 00103 ends = map(lambda _:_['localend'], writes) 00104 seqnos = map(lambda _:_['seqno'], writes) 00105 log.debug("propagated %s with seqnos %s:%s time range %s %s " % ( len(writes), min(seqnos),max(seqnos), min(starts), max(ends) )) 00106 else: 00107 log.debug("no writes for bin %s %s " % ( ibin, map(lambda _:_.ctime(),bin))) 00108 pass 00109 # end bin 00110 pass 00111 # end sv 00112 pass 00113 # end while 00114 self.handle_signal() 00115 pass 00116 00117 if __name__ == '__main__': 00118 pass 00119 00120 00121