/search.css" rel="stylesheet" type="text/css"/> /search.js">
00001 #!/usr/bin/env python 00002 """ 00003 00004 """ 00005 import os, logging 00006 from DybDbi.vrb import Vrb 00007 log = logging.getLogger(__name__) 00008 00009 class Gap(dict):pass 00010 00011 class Rec(dict): 00012 def __repr__(self): 00013 return "seqno %(seqno)4s %(ftimestart)s %(ftimeend)s %(digest)s npay %(npay)4s %(msg)s " % self 00014 00015 class History(dict): 00016 """ 00017 Validity history dict keyed on timestart seconds 00018 00019 After gap filling SEQNO ordering will not match time ordering, hence moved to 00020 keying on timestart seconds : which should also be unique within an SS block 00021 (other tasks, simmasks would break that but would need to change blocking 00022 in that case anyhow) 00023 """ 00024 def __init__(self, kls, sitemask, subsite, wtask=0, task=-1 , paydict_=lambda rpt:{}, seqnocut=3867 , target_tablename=None, dupefirst=True ): 00025 """ 00026 00027 :param task: reading task 00028 :param wtask: writing task 00029 00030 :param seqnocut: presentational only, SEQNO higher than this are highlighted 00031 :param target_tablename: 00032 :param dupefirst: duplicate the first entry to retain same SEQNO count, otherwise looses the first entry 00033 00034 00035 Collects `Rec` instances into this dict keyed by TIMESTART seconds 00036 00037 .. warn:: ensure the chunk does not have duplicate TIMESTARTs via sitemask, subsite, task choices 00038 00039 """ 00040 pass 00041 sqlcontext = "SITEMASK=%(sitemask)s and SUBSITE=%(subsite)s " % locals() 00042 vrb = Vrb(kls, sqlcontext=sqlcontext, subsite=subsite, task=task ) ## ugly subsite repetition 00043 pass 00044 self.sitemask = sitemask 00045 self.subsite = subsite 00046 self.task = task 00047 self.wtask = wtask 00048 self.sqlcontext = sqlcontext 00049 self.seqnocut = seqnocut 00050 self.target_tablename = target_tablename 00051 self.dupefirst = dupefirst 00052 pass 00053 self.kls = kls 00054 self.tn = kls.__name__[1:] 00055 self.vrb = vrb 00056 self.gaps = [] 00057 self._collect(paydict_) 00058 self.seqno2ik = {} 00059 self._index() 00060 self._findgaps() 00061 00062 00063 def __repr__(self): 00064 return "%s %s nvrec %s gaps %s " % ( self.__class__.__name__, self.sqlcontext, len(self.vrb), len(self.gaps) ) 00065 00066 def _collect(self, paydict_): 00067 """ 00068 Collect Rec instances into this dict keyed on timestart seconds 00069 00070 :param paydict_: function of rpt instance that returns a dict 00071 """ 00072 for seqno,vrec in self.vrb: 00073 cr = vrec.contextrange 00074 rpt = self.kls.Rpt().Clone() 00075 rpt.ctx( validityrec=vrec ) 00076 dpay = paydict_(rpt) # tuck away table specifics in the passed in paydict_ function 00077 npay = len(dpay) 00078 pass 00079 rec = Rec(seqno=seqno,vrec=vrec,timestart=cr.timestart,timeend=cr.timeend,npay=npay,payload=dpay,digest=rpt.digest) 00080 rec['ftimestart'] = rec['timestart'].AsString('s') 00081 rec['ftimeend'] = rec['timeend'].AsString('s') 00082 rec['msg'] = "***" if seqno > self.seqnocut else " " 00083 pass 00084 key = cr.timestart.sec 00085 self[key] = rec 00086 pass 00087 def _index(self): 00088 """ 00089 Construct mapping from SEQNO to timestart ordered index 00090 """ 00091 kets = sorted(self.keys()) 00092 self.kets = kets 00093 for ik in range(0,len(kets)): 00094 ck = kets[ik] 00095 cv = self[ck] 00096 seqno = cv['seqno'] 00097 self.seqno2ik[seqno] = ik 00098 pass 00099 00100 digests = property(lambda self:[self[t]['digest'] for t in sorted(self)], doc="payload digests in timestart seconds order") 00101 seqnos = property(lambda self:[self[t]['seqno'] for t in sorted(self)] , doc="seqnos in timestart seconds order") 00102 00103 00104 def _findgaps(self): 00105 """ 00106 Comparison of Rec with their priors, collecting any breaks 00107 in chain:: 00108 00109 [ts-te][ts-te] 00110 00111 """ 00112 for ik in range(1,len(self.kets)): 00113 ck = self.kets[ik] 00114 pk = self.kets[ik-1] 00115 cv = self[ck] 00116 pv = self[pk] 00117 pass 00118 assert ck == cv['timestart'].sec 00119 assert pk == pv['timestart'].sec 00120 assert cv['npay'] == pv['npay'], (cv, pv) 00121 #assert cv['seqno'] > pv['seqno'], (cv, pv) ## no longer the case after gap filling 00122 assert cv['timestart'].sec >= pv['timeend'].sec, (cv, pv) 00123 assert cv['timestart'].sec >= pv['timeend'].sec, (cv, pv) 00124 pass 00125 if cv['timestart'].sec != pv['timeend'].sec: 00126 gap = Gap(seqbef=pv['seqno'],seqaft=cv['seqno'],timestart=pv['timeend'], timeend=cv['timestart'] ) 00127 self.gaps.append(gap) 00128 00129 def dump(self): 00130 """ 00131 Dump validities, highlighting those with SEQNO greater than cut 00132 """ 00133 for i,t in enumerate(sorted(self)): 00134 rec = self[t] 00135 log.info("%4s %s %r" % (i,t,rec)) 00136 00137 def paydump(self, paykey='L2C2R8'): 00138 for i,t in enumerate(sorted(self)): 00139 rec = hist[t] 00140 pay = rec['payload'] 00141 pav = pay.get(paykey,None) 00142 log.info("%4s %s %r %s" % (i,t,rec,pav )) 00143 00144 def fillgaps(self): 00145 """ 00146 Filling gaps via duplication of prior entry with timerange shifted 00147 """ 00148 wseqs=[] 00149 for gap in self.gaps: 00150 seqbef = gap['seqbef'] 00151 wseq=self.writecopy( seqbef, gap['timestart'], gap['timeend'] , task=self.wtask ) 00152 wseqs.append(wseq) 00153 log.info("wrote seqnos %r " % wseqs ) 00154 00155 def shunt(self, seqno): 00156 """ 00157 :param seqno: if 0 all entries are shunted, otherwise only the specified SEQNO 00158 00159 Shunting payload forward, '''pay it forward''' 00160 00161 Originally the SEQNO were not aligned due to the 00162 treatment via chunks of (SITEMASK, SUBSITE). 00163 Effectively the deck was unshuffled into sorted manner. 00164 00165 To rectify this the source validity table (SEQNO, SITEMASK, SUBSITE) 00166 sequence is used to retrace the steps of the scraper and address multiple 00167 instances of this history class for each (SITEMASK,SUBSITE) chunk. 00168 This allowed the deck to be shuffled according to the source shuffle. 00169 00170 #. 1st rec untouched 00171 #. 2nd rec gets payload from 1st 00172 #. 3rd rec gets payload from 2nd 00173 #. etc 00174 #. nth rec gets payload from (n-1)th 00175 #. last gets payload from penultimate 00176 00177 """ 00178 assert len(self.gaps) == 0, "must fill gaps before shunting " 00179 assert self.target_tablename , "must define target table when shunting " 00180 00181 if seqno == 0: 00182 log.info("shunt into %s %s " % ( len(self.kets),self.target_tablename) ) 00183 iks = range(0,len(self.kets)) 00184 else: 00185 iks = [self.seqno2ik[seqno]] 00186 00187 for ik in iks: 00188 ck = self.kets[ik] 00189 cv = self[ck] 00190 if ik == 0 and self.dupefirst: 00191 if seqno > 0: 00192 assert cv['seqno'] == seqno 00193 self.writecopy( cv['seqno'], cv['timestart'], cv['timeend'], task=self.wtask , target_tablename=self.target_tablename ) 00194 continue 00195 pk = self.kets[ik-1] 00196 pv = self[pk] 00197 pass 00198 assert ck == cv['timestart'].sec 00199 assert pk == pv['timestart'].sec 00200 assert cv['npay'] == pv['npay'], (cv, pv) 00201 assert cv['timestart'].sec >= pv['timeend'].sec, (cv, pv) 00202 assert cv['timestart'].sec >= pv['timeend'].sec, (cv, pv) 00203 assert cv['timestart'].sec == pv['timeend'].sec , "gaps need fixin 1st " 00204 pass 00205 if seqno > 0: 00206 assert cv['seqno'] == seqno 00207 self.writecopy( pv['seqno'], cv['timestart'], cv['timeend'], task=self.wtask , target_tablename=self.target_tablename ) 00208 00209 00210 def writecopy(self, seqno, timestart=None, timeend=None, task=0, target_tablename=None ): 00211 """ 00212 Copy validity rec and payload contents corresponding to seqno 00213 into a new entry, optionally changing timestart and timeend 00214 00215 :param vrec: ValidityRec instance 00216 :param timestart: TimeStamp instance 00217 :param timeend: TimeStamp instance 00218 :param task: 00219 :param target_tablename: 00220 """ 00221 from DybDbi import ContextRange, SimFlag, TimeStamp 00222 00223 vrec = self.vrb(seqno) 00224 assert vrec.subsite == self.subsite 00225 assert vrec.contextrange.sitemask == self.sitemask 00226 #assert vrec.contextrange.simmask == self.simmask 00227 00228 if not target_tablename: 00229 target_tablename = self.tn 00230 if not timestart: 00231 timestart = vrec.contextrange.timestart 00232 if not timeend: 00233 timeend = vrec.contextrange.timeend 00234 00235 wrt = self.kls.Wrt().Clone() 00236 cr = ContextRange( self.sitemask, SimFlag.kData , timestart, timeend ) 00237 overlay = TimeStamp(0,0) 00238 wrt.ctx( contextrange=cr, subsite=vrec.subsite, task=task, versiondate=overlay, tablename=target_tablename ) 00239 rpt = self.kls.Rpt().Clone() 00240 rpt.ctx( validityrec=vrec ) 00241 n = len(rpt) 00242 for row in rpt: 00243 wrt.Write(row) 00244 wseqno = wrt.Close() 00245 return wseqno 00246 00247 00248 00249 if __name__ == '__main__': 00250 pass 00251 os.environ.setdefault('DBCONF','tmp_offline_db') 00252 logging.basicConfig(level=logging.INFO) 00253 from DybDbi import gDbi, GDcsAdWpHv 00254 gDbi.level = "WARNING" 00255 00256 paydict = lambda rpt:dict([r.locationid,r.voltage] for r in rpt) 00257 #paydict = lambda rpt:{} 00258 hist = History( GDcsAdWpHv, sitemask=1, subsite=1 , paydict_=paydict ) 00259 hist.findgaps() 00260 print hist 00261 if len(hist.gaps)>0: 00262 print "\n".join(map(repr,hist.gaps)) 00263 00264 for i,t in enumerate(sorted(hist)): 00265 rec = hist[t] 00266 log.info("%4s %s %r" % (i,t,rec)) 00267