/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

history.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 """
00003 
00004 """
00005 import os, logging
00006 from DybDbi.vrb import Vrb 
00007 log = logging.getLogger(__name__)
00008 
00009 class Gap(dict):pass
00010 
00011 class Rec(dict):
00012     def __repr__(self):
00013         return "seqno %(seqno)4s %(ftimestart)s %(ftimeend)s  %(digest)s npay %(npay)4s %(msg)s " % self 
00014  
00015 class History(dict):
00016     """
00017     Validity history dict keyed on timestart seconds
00018 
00019     After gap filling SEQNO ordering will not match time ordering, hence moved to 
00020     keying on timestart seconds : which should also be unique within an SS block
00021     (other tasks, simmasks would break that but would need to change blocking 
00022     in that case anyhow)
00023     """
00024     def __init__(self, kls, sitemask, subsite, wtask=0, task=-1 , paydict_=lambda rpt:{}, seqnocut=3867 , target_tablename=None, dupefirst=True ):
00025         """
00026 
00027         :param task: reading task
00028         :param wtask: writing task
00029 
00030         :param seqnocut: presentational only, SEQNO higher than this are highlighted
00031         :param target_tablename:
00032         :param dupefirst: duplicate the first entry to retain same SEQNO count, otherwise looses the first entry 
00033 
00034 
00035         Collects `Rec` instances into this dict keyed by TIMESTART seconds
00036 
00037         .. warn:: ensure the chunk does not have duplicate TIMESTARTs via sitemask, subsite, task choices
00038 
00039         """
00040         pass
00041         sqlcontext = "SITEMASK=%(sitemask)s and SUBSITE=%(subsite)s " % locals() 
00042         vrb = Vrb(kls, sqlcontext=sqlcontext, subsite=subsite, task=task )       ## ugly subsite repetition
00043         pass
00044         self.sitemask = sitemask
00045         self.subsite = subsite
00046         self.task = task
00047         self.wtask = wtask
00048         self.sqlcontext = sqlcontext 
00049         self.seqnocut = seqnocut
00050         self.target_tablename = target_tablename
00051         self.dupefirst = dupefirst
00052         pass
00053         self.kls = kls
00054         self.tn = kls.__name__[1:] 
00055         self.vrb = vrb
00056         self.gaps = []
00057         self._collect(paydict_)
00058         self.seqno2ik = {}        
00059         self._index()
00060         self._findgaps()
00061 
00062 
00063     def __repr__(self):
00064         return "%s %s nvrec %s gaps %s " % ( self.__class__.__name__, self.sqlcontext, len(self.vrb), len(self.gaps) )
00065 
00066     def _collect(self, paydict_):
00067         """
00068         Collect Rec instances into this dict keyed on timestart seconds
00069 
00070         :param paydict_: function of rpt instance that returns a dict
00071         """
00072         for seqno,vrec in self.vrb:
00073             cr = vrec.contextrange
00074             rpt = self.kls.Rpt().Clone()
00075             rpt.ctx( validityrec=vrec )
00076             dpay = paydict_(rpt)   # tuck away table specifics in the passed in paydict_ function
00077             npay = len(dpay) 
00078             pass 
00079             rec = Rec(seqno=seqno,vrec=vrec,timestart=cr.timestart,timeend=cr.timeend,npay=npay,payload=dpay,digest=rpt.digest)
00080             rec['ftimestart'] = rec['timestart'].AsString('s')
00081             rec['ftimeend']   = rec['timeend'].AsString('s')
00082             rec['msg'] = "***" if seqno > self.seqnocut else "   "
00083             pass
00084             key = cr.timestart.sec 
00085             self[key] = rec 
00086             pass
00087     def _index(self): 
00088         """
00089         Construct mapping from SEQNO to timestart ordered index
00090         """
00091         kets = sorted(self.keys()) 
00092         self.kets = kets
00093         for ik in range(0,len(kets)):
00094             ck = kets[ik]
00095             cv = self[ck]
00096             seqno = cv['seqno']
00097             self.seqno2ik[seqno] = ik
00098         pass 
00099 
00100     digests = property(lambda self:[self[t]['digest'] for t in sorted(self)], doc="payload digests in timestart seconds order")
00101     seqnos  = property(lambda self:[self[t]['seqno'] for t in sorted(self)] , doc="seqnos in timestart seconds order")
00102 
00103 
00104     def _findgaps(self):
00105         """
00106         Comparison of Rec with their priors, collecting any breaks 
00107         in chain::
00108 
00109             [ts-te][ts-te]
00110 
00111         """
00112         for ik in range(1,len(self.kets)):
00113             ck = self.kets[ik]
00114             pk = self.kets[ik-1] 
00115             cv = self[ck]
00116             pv = self[pk] 
00117             pass
00118             assert ck == cv['timestart'].sec
00119             assert pk == pv['timestart'].sec
00120             assert cv['npay'] == pv['npay'], (cv, pv)
00121             #assert cv['seqno'] > pv['seqno'], (cv, pv)    ## no longer the case after gap filling
00122             assert cv['timestart'].sec >= pv['timeend'].sec, (cv, pv)
00123             assert cv['timestart'].sec >= pv['timeend'].sec, (cv, pv)
00124             pass
00125             if cv['timestart'].sec != pv['timeend'].sec:
00126                 gap = Gap(seqbef=pv['seqno'],seqaft=cv['seqno'],timestart=pv['timeend'], timeend=cv['timestart'] )             
00127                 self.gaps.append(gap)
00128  
00129     def dump(self):
00130         """
00131         Dump validities, highlighting those with SEQNO greater than cut 
00132         """
00133         for i,t in enumerate(sorted(self)):
00134             rec = self[t]
00135             log.info("%4s %s %r" % (i,t,rec))
00136 
00137     def paydump(self, paykey='L2C2R8'):
00138         for i,t in enumerate(sorted(self)):
00139             rec = hist[t]
00140             pay = rec['payload']
00141             pav = pay.get(paykey,None)
00142             log.info("%4s %s %r %s" % (i,t,rec,pav ))
00143  
00144     def fillgaps(self):
00145         """
00146         Filling gaps via duplication of prior entry with timerange shifted
00147         """
00148         wseqs=[] 
00149         for gap in self.gaps:
00150             seqbef = gap['seqbef']
00151             wseq=self.writecopy( seqbef, gap['timestart'], gap['timeend'] , task=self.wtask )
00152             wseqs.append(wseq)
00153         log.info("wrote seqnos %r " % wseqs )
00154 
00155     def shunt(self, seqno):
00156         """
00157         :param seqno: if 0 all entries are shunted, otherwise only the specified SEQNO
00158 
00159         Shunting payload forward, '''pay it forward'''
00160 
00161         Originally the SEQNO were not aligned due to the
00162         treatment via chunks of (SITEMASK, SUBSITE).
00163         Effectively the deck was unshuffled into sorted manner.  
00164 
00165         To rectify this the source validity table (SEQNO, SITEMASK, SUBSITE)
00166         sequence is used to retrace the steps of the scraper and address multiple
00167         instances of this history class for each (SITEMASK,SUBSITE) chunk.
00168         This allowed the deck to be shuffled according to the source shuffle.    
00169 
00170         #. 1st rec untouched
00171         #. 2nd rec gets payload from 1st 
00172         #. 3rd rec gets payload from 2nd
00173         #. etc
00174         #. nth rec gets payload from (n-1)th
00175         #. last gets payload from penultimate
00176 
00177         """
00178         assert len(self.gaps) == 0, "must fill gaps before shunting "
00179         assert self.target_tablename , "must define target table when shunting "
00180 
00181         if seqno == 0:
00182             log.info("shunt into %s %s " %  ( len(self.kets),self.target_tablename) )
00183             iks = range(0,len(self.kets))
00184         else:
00185             iks = [self.seqno2ik[seqno]] 
00186 
00187         for ik in iks:
00188             ck = self.kets[ik]
00189             cv = self[ck]
00190             if ik == 0 and self.dupefirst:
00191                 if seqno > 0:
00192                     assert cv['seqno'] == seqno 
00193                 self.writecopy( cv['seqno'], cv['timestart'], cv['timeend'], task=self.wtask , target_tablename=self.target_tablename )
00194                 continue            
00195             pk = self.kets[ik-1] 
00196             pv = self[pk] 
00197             pass
00198             assert ck == cv['timestart'].sec
00199             assert pk == pv['timestart'].sec
00200             assert cv['npay'] == pv['npay'], (cv, pv)
00201             assert cv['timestart'].sec >= pv['timeend'].sec, (cv, pv)
00202             assert cv['timestart'].sec >= pv['timeend'].sec, (cv, pv)
00203             assert cv['timestart'].sec == pv['timeend'].sec , "gaps need fixin 1st "
00204             pass 
00205             if seqno > 0:
00206                 assert cv['seqno'] == seqno 
00207             self.writecopy( pv['seqno'], cv['timestart'], cv['timeend'], task=self.wtask , target_tablename=self.target_tablename )
00208 
00209 
00210     def writecopy(self, seqno, timestart=None, timeend=None, task=0, target_tablename=None ):
00211         """
00212         Copy validity rec and payload contents corresponding to seqno 
00213         into a new entry, optionally changing timestart and timeend        
00214 
00215         :param vrec: ValidityRec instance
00216         :param timestart: TimeStamp instance
00217         :param timeend: TimeStamp instance
00218         :param task:
00219         :param target_tablename:
00220         """ 
00221         from DybDbi import ContextRange, SimFlag, TimeStamp
00222 
00223         vrec = self.vrb(seqno)
00224         assert vrec.subsite == self.subsite
00225         assert vrec.contextrange.sitemask == self.sitemask
00226         #assert vrec.contextrange.simmask == self.simmask
00227 
00228         if not target_tablename:
00229             target_tablename = self.tn
00230         if not timestart:
00231             timestart = vrec.contextrange.timestart
00232         if not timeend:
00233             timeend = vrec.contextrange.timeend
00234 
00235         wrt = self.kls.Wrt().Clone()
00236         cr = ContextRange( self.sitemask,  SimFlag.kData , timestart, timeend )
00237         overlay = TimeStamp(0,0)
00238         wrt.ctx( contextrange=cr, subsite=vrec.subsite, task=task, versiondate=overlay, tablename=target_tablename )
00239         rpt = self.kls.Rpt().Clone()
00240         rpt.ctx( validityrec=vrec )
00241         n = len(rpt)
00242         for row in rpt:
00243             wrt.Write(row)
00244         wseqno = wrt.Close()
00245         return wseqno
00246 
00247 
00248 
00249 if __name__ == '__main__':
00250     pass
00251     os.environ.setdefault('DBCONF','tmp_offline_db')
00252     logging.basicConfig(level=logging.INFO)
00253     from DybDbi import gDbi, GDcsAdWpHv
00254     gDbi.level = "WARNING"
00255 
00256     paydict = lambda rpt:dict([r.locationid,r.voltage] for r in rpt)
00257     #paydict = lambda rpt:{}
00258     hist = History( GDcsAdWpHv, sitemask=1, subsite=1 , paydict_=paydict )
00259     hist.findgaps()
00260     print hist
00261     if len(hist.gaps)>0:
00262        print "\n".join(map(repr,hist.gaps))
00263 
00264     for i,t in enumerate(sorted(hist)):
00265         rec = hist[t]
00266         log.info("%4s %s %r" % (i,t,rec))
00267 
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 09:50:03 for Scraper by doxygen 1.7.4