/search.css" rel="stylesheet" type="text/css"/> /search.js">
00001 import os, logging, pickle, datetime, hashlib 00002 from pprint import pformat 00003 log = logging.getLogger(__name__) 00004 00005 class DebugValidityOrdering(object): 00006 """ 00007 00008 .. warn:: **FOR DEBUGGING ONLY** NB the change impacts the kls specified only 00009 00010 Context manger for non standard DBI Validity ordering queries, allowing what happens 00011 within a tied VERSIONDATE occurs to be examined. 00012 Observation suggests MySQL implicity orders by the PK(SEQNO) asc 00013 within tied VERSIONDATE sets. 00014 00015 The extra ordering string is impinged between "order by VERSIONDATE desc" 00016 and the terminating ";" with a "," prefix when the extra is non-blank. 00017 00018 Usage:: 00019 00020 from DybPython.tab import DebugValidityOrdering 00021 from DybDbi import GDemo 00022 00023 with DebugValidityOrdering(GDemo,"SEQNO desc"): 00024 r = GDemo.Rpt() 00025 n = len(r) 00026 print n, r.vrec.seqno 00027 00028 """ 00029 def __init__(self, kls , tmp ): 00030 self.tmp = tmp 00031 self.proxy = kls.GetTableProxy().GetDBProxy() 00032 self.prior = None 00033 def __enter__( self ): 00034 if not self.tmp: 00035 self.prior = None 00036 log.warn("extra ordering of None, leaving ordering as is") 00037 else: 00038 self.prior = self.proxy.GetExtraOrdering() 00039 self.proxy.SetExtraOrdering( self.tmp ) 00040 log.info("changing extra ordering : %s -> %s " % ( self.prior, self.tmp ) ) 00041 00042 def __exit__(self, type, value, tb ): 00043 if not self.prior: 00044 log.warn("no prior set ") 00045 return None 00046 self.proxy.SetExtraOrdering(self.prior) 00047 log.info("reverting extra ordering : %s -> %s " % ( self.prior, self.extra) ) 00048 00049 00050 class Rollback(object): 00051 """ 00052 INSERTDATE rollback context manager 00053 """ 00054 def __init__(self, insertdate ): 00055 self.insertdate = insertdate 00056 00057 def config(self, insertdate): 00058 from DybDbi import TimeStamp, gDbi 00059 delta = datetime.timedelta( seconds=1 ) ## is this really needed ? YES as rollback uses "INSERTDATE < t" 00060 ts = TimeStamp.fromAssumedUTCDatetime( insertdate + delta ) 00061 gDbi.ConfigRollback("* = %s" % ts.AsString("s") ) 00062 return ts 00063 00064 def __enter__(self): 00065 if self.insertdate != None: 00066 self.config(self.insertdate) 00067 def __exit__(self, type, value, tb ): 00068 from DybDbi import gDbi 00069 gDbi.ClearRollbackDates() 00070 00071 00072 def t_(_): 00073 """ 00074 Convert a datetime into a TimeStamp 00075 """ 00076 from DybDbi import TimeStamp 00077 return TimeStamp.fromAssumedUTCDatetime(_) 00078 00079 def rst_title( msg , cha="~" ): 00080 return "%s\n%s\n\n" % ( msg, cha * len(msg) ) 00081 00082 class ctxdict(dict): 00083 """ 00084 Specialization of `dict` to make hashable allowing to be used in `sets` and other dicts 00085 00086 .. warning:: do **not** mutate these dicts if you wish to maintain sanity 00087 00088 """ 00089 def __hash__(self): 00090 return hash(tuple(sorted(self.items()))) 00091 def __str__(self): 00092 return "_".join(map(lambda k:"%s%s" % (k,self[k]) , sorted(self) )) 00093 00094 00095 class Tab(object): 00096 """ 00097 Marriage of raw DB access via MySQL-python and DybDbi access 00098 """ 00099 def __init__(self, name , db ): 00100 """ 00101 :param name: name of DBI payload table 00102 :param db: :py:class:`DybPython.DB` instance 00103 """ 00104 self.name = name 00105 self.db = db 00106 def q(self, sql): 00107 return self.db( sql % {'tab':self.name} ) 00108 def column(self, n, sql): 00109 return [d[n] for d in self.db( sql % {'tab':self.name} )] 00110 def vdistinct(self, n ): 00111 return self.column(n, "select distinct("+n+") from %(tab)sVld") 00112 00113 def fields(self, skips=[]): 00114 """ 00115 Return list of fields in the this table, excluding the skipped fields. 00116 """ 00117 return [_['Field'] for _ in self.db("describe %s" % self.name ) if _['Field'] not in skips ] 00118 00119 def _qminmaxavg(self, skips=('SEQNO',), group_by='SEQNO' ): 00120 """ 00121 Return SQL query string that provides min, max and avg values of each field within `group_by` groups 00122 """ 00123 fields = self.fields(skips=skips) 00124 mma = lambda f:"min(%(f)s) as mi%(f)s, max(%(f)s) as mx%(f)s, avg(%(f)s) as av%(f)s" % dict(f=f) 00125 sel = ",".join(map(mma,fields)) 00126 return "select %(group_by)s,%(sel)s from %(tab)s group by %(group_by)s" % dict(tab=self.name, group_by=group_by, sel=sel) 00127 00128 def qminmaxavg( self): 00129 return self.db( self._qminmaxavg() ) 00130 00131 def mma(self): 00132 """ 00133 Return a string rst table showing min/max/avg values grouped by SEQNO. 00134 When the docs virtual python is used, a valid rst table is returned otherwise 00135 a pale imitation table is provided. 00136 00137 Note that min/max/avg are consolidated into a single cell to avoid overlarge tables 00138 """ 00139 flds = self.fields(skips=('SEQNO',)) 00140 from DybPython.vlut import TabularData 00141 td = TabularData() 00142 for e in self.qminmaxavg(): 00143 d = dict(SEQNO=e['SEQNO']) 00144 for f in flds: 00145 d[f] = "%s %s %s" % ( e['mi'+f], e['mx'+f], e['av'+f ] ) 00146 td.append(d) 00147 return td.as_rst( ['SEQNO'] + flds ) 00148 00149 00150 def dropzero(self): 00151 """ 00152 DROPs the DBI payload and validity tables and sets the LASTUSEDSEQNO in LOCALSEQNO table to zero. 00153 Only works when the DBCONF section name and actual database name starts with 'tmp_' 00154 """ 00155 tab = self.name 00156 dbname = self.db.dbc['database'] 00157 if dbname.startswith('tmp_') and self.db.sect.startswith('tmp_'): 00158 log.info("dropping DBI table pair for %(tab)s and zeroing LOCALSEQNO.LASTUSEDSEQNO " % locals() ) 00159 self.db("drop table if exists %(tab)s, %(tab)sVld" % locals() ) 00160 self.db("update LOCALSEQNO set LASTUSEDSEQNO=0 where TABLENAME='%(tab)s'" % locals() ) 00161 else: 00162 raise Exception("dropzero only permitted when DBCONF section name and actual database name starts with tmp_ %(tab)s" % locals() ) 00163 00164 def vld(self, *seqnos): 00165 seqs = "(%s)" % seqnos[0] if len(seqnos)==1 else str(tuple(seqnos)) 00166 return self.q("select * from %(tab)sVld where SEQNO in " + seqs ) 00167 00168 def vpayloadless(self): 00169 """ 00170 Return validity entries with no corresponding payload 00171 00172 There are legitimate reasons to go payloadless, however it is also a likely outcome of 00173 some error conditions when writing to the DB. 00174 """ 00175 tab = self.name 00176 return self.q("select * from %(tab)sVld where SEQNO not in (select distinct(SEQNO) from %(tab)s) order by SEQNO " % locals() ) 00177 00178 def vorder(self): 00179 return 'SEQNO TIMESTART TIMEEND SITEMASK SIMMASK SUBSITE TASK AGGREGATENO VERSIONDATE INSERTDATE'.split() 00180 00181 def vctxsql(self, ctx): 00182 """ 00183 Provide context SQL query excluding time requirement for interactive mysql querying. Usage:: 00184 00185 tab = db.tab('Demo') 00186 actx = tab.actual_ctxs() 00187 sql = tab.vctxsql(actx[0]) 00188 > 'select * from DemoVld where SiteMask & 127 and SimMask & 1 and Task = 0 and SubSite = 0 order by VERSIONDATE desc' 00189 00190 """ 00191 d = dict(tab=self.name) 00192 d.update(ctx) 00193 return "select * from %(tab)sVld where SiteMask & %(site)s and SimMask & %(simflag)s and Task = %(task)s and SubSite = %(subsite)s order by VERSIONDATE desc" % d 00194 00195 def vctx(self, ctx): 00196 return self.db( self.vctxsql( ctx)) 00197 00198 def dbictx_directive(self, ctx ): 00199 """ 00200 Provides rst source like:: 00201 00202 select * from Demo ... 00203 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00204 00205 .. dbictx:: tmp_offline_db Demo 00206 :site: 00207 :simflag: 00208 :task: 00209 :subsite: 00210 00211 Which directs Sphinx to perform a faux-DBI context query when the documentation is built and present the result 00212 as a table within the docs. 00213 00214 """ 00215 vl += rst_title(str(self.vctxsql(ctx))) 00216 vl += '.. dbictx:: %(dbconf)s %(tn)s\n :site: %(site)s\n :simflag: %(simflag)s\n :task: %(task)s\n :subsite: %(subsite)s\n' % dict(ctx, dbconf=self.db.sect ,tn=self.name) 00217 return vl 00218 00219 def vlast(self, n , orderlimit="order by SEQNO desc limit 1" ): 00220 """ 00221 :param n: name of row in validity table, eg `INSERTDATE` 00222 :param orderlimit: SQL ordering and limit clause 00223 """ 00224 vals = self.column(n, "select `"+n+"` from %(tab)sVld " + orderlimit ) 00225 return vals[0] if len(vals) == 1 else None 00226 00227 def vtimes(self): 00228 return sorted(list(set( self.vdistinct("TIMESTART") + self.vdistinct("TIMEEND") ))) 00229 00230 def vseqnos(self, *seqnos, **kwa): 00231 """ 00232 Returns string presentation of a list of SEQNO in a DBI validity table 00233 00234 :param seqnos: one or more SEQNO 00235 00236 Usage:: 00237 00238 In [6]: p tab.vseqnos(10,1) 00239 +-------+---------------------+---------------------+----------+---------+---------+------+-------------+---------------------+---------------------+ 00240 | SEQNO | TIMESTART | TIMEEND | SITEMASK | SIMMASK | SUBSITE | TASK | AGGREGATENO | VERSIONDATE | INSERTDATE | 00241 +-------+---------------------+---------------------+----------+---------+---------+------+-------------+---------------------+---------------------+ 00242 | 1 | 2010-01-01 01:00:00 | 2038-01-19 03:14:07 | 127 | 1 | 0 | 0 | -1 | 2010-01-01 01:00:00 | 2011-08-22 12:14:21 | 00243 | 10 | 2010-01-01 04:00:00 | 2038-01-19 03:14:07 | 127 | 1 | 0 | 0 | -1 | 2010-01-01 04:02:00 | 2011-08-22 12:14:30 | 00244 +-------+---------------------+---------------------+----------+---------+---------+------+-------------+---------------------+---------------------+ 00245 00246 """ 00247 raw = kwa.pop('raw',False) 00248 if raw: 00249 seqtuple = "(%s)" % seqnos[0] if len(seqnos)==1 else str(tuple(seqnos)) 00250 return self.db.mysql("select * from %(tab)sVld where SEQNO in %(seqtuple)s" % {'tab':self.name,'seqtuple':seqtuple} , opts="-t" ) 00251 else: 00252 return self.vld(*seqnos) 00253 00254 00255 00256 def actual_ctxs(self): 00257 """ 00258 Provide list of distinct context dicts actually present in the Vld table, 00259 ordered by the string representation. 00260 00261 Masks and specials kAnyTask = -1, kAnySubSite = -1 are not yet handled differently 00262 00263 :return: list of dicts such as ``dict(site=127,simflag=1,subsite=1,task=0, aggno=-1)`` covering all contexts in Vld table 00264 00265 """ 00266 actual = [] 00267 for e in self.q("select * from %(tab)sVld"): 00268 ctx = ctxdict(task=int(e['TASK']),simflag=int(e['SIMMASK']),site=int(e['SITEMASK']),subsite=int(e['SUBSITE']),aggno=int(e['AGGREGATENO'])) 00269 actual.append(ctx) 00270 return sorted(list(set(actual)),cmp=lambda a,b:cmp(str(a),str(b))) 00271 00272 def looped_ctxs(self): 00273 """ 00274 Looped context dicts based on distinct values of the components. 00275 00276 :return: list of dicts such as ``dict(site=127,simflag=1,subsite=1,task=0, aggno=-1)`` covering all relevant contexts suggested by Vld table 00277 00278 :: 00279 +-------+---------------------+---------------------+----------+---------+---------+------+-------------+---------------------+---------------------+ 00280 | SEQNO | TIMESTART | TIMEEND | SITEMASK | SIMMASK | SUBSITE | TASK | AGGREGATENO | VERSIONDATE | INSERTDATE | 00281 +-------+---------------------+---------------------+----------+---------+---------+------+-------------+---------------------+---------------------+ 00282 00283 """ 00284 sites = self.vdistinct("SITEMASK") 00285 simflags = self.vdistinct("SIMMASK") 00286 subsites = self.vdistinct("SUBSITE") 00287 tasks = self.vdistinct("TASK") 00288 aggnos = self.vdistinct("AGGREGATENO") 00289 00290 l = [] 00291 for task in map(int,tasks): 00292 for simflag in map(int,simflags): 00293 for site in map(int,sites): 00294 for subsite in map(int,subsites): 00295 for aggno in map(int,aggnos): 00296 l.append(ctxdict(task=task,simflag=simflag,site=site,subsite=subsite,aggno=aggno)) 00297 return l 00298 00299 00300 exists = property(lambda self:self.db.has_table(self.name), doc="True if DBI payload table exists") 00301 00302 path = property(lambda self:"%s.txt" % self.name) 00303 def save(self, smth, dir=None ): 00304 """ 00305 :param smth: some string related to the table to be written 00306 """ 00307 path = os.path.join(dir, self.path) if dir else self.path 00308 f = open( path , "w") 00309 f.write( smth ) 00310 f.close() 00311 00312 def _kls(self): 00313 import DybDbi 00314 return getattr( DybDbi , 'G' + self.name ) 00315 kls = property( _kls ) 00316 00317 00318 def tscan(self, times, ctx, opts): 00319 """ 00320 :param times: list of datetimes at which to perform DBI queries 00321 :param ctx: DybDbi ctx kwargs, excluding `timestamp` 00322 :param opts: dict of extra options 00323 00324 :return lod: list-of-dicts representing winning SEQNO vs T 00325 00326 Also for each time an extra FindSeqNoWithVersionDate query is done 00327 in order to explicity show where the ambiguous rot sets in on the VLUT. 00328 The excludingSeqno is used to avoid finding itself 00329 00330 BUT FindSeqNoWithVersionDate will return 0 if it finds more than 1 ... 00331 maybe need to allow to pass a string 00332 00333 00334 When using the default timeGate the view of collisions seen by FindSeqNoWithVersionDate 00335 should correspond to the standard DBI view 00336 00337 00338 """ 00339 kls = self.kls 00340 tsc = [] 00341 00342 dbno = 0 00343 uniqueVersionDateSiteMask = 0 00344 uniqueVersionDateSimMask = 0 ## 0 means use the vrec ones 00345 useTimeGate = 0 ## 0 means NO, 1 use default, >15 use value as non-standard gate 00346 00347 ordering = opts.get("ordering",None) 00348 00349 for t in times: 00350 ts = t_(t) 00351 log.debug("tscan t %s ts %s ordering %s " % ( t, ts.AsString("s"), ordering ) ) 00352 kls.Cache().Purge() 00353 00354 with DebugValidityOrdering(kls,ordering): 00355 00356 r = kls.Rpt().Clone() 00357 r.ctx( timestamp=ts, **ctx ) 00358 n = len(r) 00359 00360 nvr = r.key.numvrecs 00361 seq = r.key.seqnolist 00362 dig = r.digest ## untested with aggregates 00363 00364 log.debug("payload count %s nvr %s seq %s dig %s " % (n, nvr,seq,dig ) ) 00365 00366 d = dict(t=t,n=n,dig=dig) 00367 00368 ## aggregate Table queries yield r.vrec.seqno all over the shop depending on extra ordering, but it appears not to matter 00369 ## ... to allow VLUT comparisons to work for aggregate tables exposed the `key.seqnolist` 00370 ## 00371 if nvr > 1: ## aggregate query 00372 seq = r.key.seqnolist 00373 d.update( rvs=seq ) 00374 ## unclear how to do collision test in aggregate case 00375 else: 00376 #assert r.vrec.seqno == r.key.seqnolist, (r.vrec.seqno , r.key.seqnolist ) 00377 rvs = r.vrec.seqno 00378 rvs_versiondate = r.vrec.versiondate 00379 excludingSeqno = rvs 00380 count = True 00381 ncollision = proxy.FindSeqNoWithVersionDate( rvs_versiondate , r.vrec , dbno ,uniqueVersionDateSiteMask, uniqueVersionDateSimMask, excludingSeqno, useTimeGate, count ) 00382 if ncollision==1: 00383 count = False 00384 collision = proxy.FindSeqNoWithVersionDate( rvs_versiondate , r.vrec , dbno ,uniqueVersionDateSiteMask, uniqueVersionDateSimMask, excludingSeqno, useTimeGate, count ) 00385 else: 00386 collision = -1 00387 pass 00388 d.update( rvs=rvs,collision=collision,ncollision=ncollision) 00389 pass 00390 tsc.append( d ) 00391 00392 return tsc 00393 00394 def vscan(self, digest=False, timestart=False): 00395 """ 00396 Technical check ``(SEQNO vs SEQNO)`` 00397 Uses genDbi class for the table to perform DBI query for all SEQNO 00398 :return lod: list-of-dicts 00399 00400 Formerly was doing ``rvs = r.vrec.seqno if n>0 else 0`` but 00401 ``n=0`` will be OK once orphans are allowed again 00402 00403 """ 00404 kls = self.kls 00405 r = kls.Rpt() 00406 vsc = [] 00407 for e in self.db("select * from %(tab)sVld order by SEQNO asc" % {'tab':self.name} ): 00408 ts = t_( e['TIMESTART'] ) 00409 r.ctx( timestamp=ts, site=e['SITEMASK'], simflag=e['SIMMASK'], subsite=e['SUBSITE'], task=e['TASK'] ) 00410 n = len(r) 00411 rvs = r.vrec.seqno 00412 d = dict(seqno=e['SEQNO'], rvs=rvs, n=n ) 00413 if digest: 00414 d['digest'] = r.digest 00415 if timestart: 00416 d['timestart'] = e['TIMESTART'] 00417 vsc.append( d ) 00418 return vsc 00419 00420 00421 def fullscan(self, ctx, opts ): 00422 """ 00423 Performs SEQNO vs T scans for full history of a table, 00424 via rollbacks to all distinct INSERTDATES (+1s) 00425 00426 NB raw DB selects are blind to the rollback 00427 00428 :param ctx: DybDbi ctx kwargs, excluding `timestamp` 00429 :param opts: dict with any extra variations 00430 00431 :return dld: INSERTDATE keyed dict-of-list-of-dicts 00432 """ 00433 times = self.vtimes() ## raw DB select is blind to the DBI rollback 00434 times = times[:-1] ## kludgy exclusion of EOT 00435 fscan = {} 00436 00437 insertdate_aligned = opts['_aligned'] 00438 insertdates_ = sorted(self.vdistinct("INSERTDATE")) 00439 if insertdate_aligned: 00440 insertdates = zip(insertdates_,insertdates_) 00441 else: 00442 insertdates = zip(["latest"],[insertdates_[-1]]) 00443 00444 for insertlabel,insertdate in insertdates: 00445 with Rollback(insertdate): 00446 tsc = self.tscan(times, ctx, opts ) 00447 log.debug("fullscan %s" % pformat(tsc)) 00448 fscan[insertlabel] = tsc 00449 return fscan 00450 00451 00452 def test_ctx(): 00453 from db import DB 00454 db = DB("tmp_offline_db") 00455 for tab in db.showpaytables: 00456 ta = db.tab(tab) 00457 00458 lastinsert = ta.vlast('INSERTDATE') 00459 lctxs = ta.looped_ctxs() 00460 actxs = ta.actual_ctxs() 00461 00462 log.info("tab %-20s lastinsert %-20s : %-4s(%-4s) looped(actual) ctx " % (tab, lastinsert, len(lctxs),len(actxs)) ) 00463 for ctx in lctxs: 00464 log.debug("lctx %s " % ctx) 00465 for ctx in actxs: 00466 log.debug("actx %s " % ctx) 00467 00468 00469 def test_scan(): 00470 from db import DB 00471 db = DB("tmp_offline_db") 00472 for tab in db.showpaytables: 00473 ta = db.tab(tab) 00474 actxs = ta.actual_ctxs() 00475 for ctx in actxs: 00476 ta.fullscan(ctx) 00477 00478 def test_rollback(): 00479 from db import DB 00480 ta = db.tab('CableMap') 00481 opts = {} 00482 actxs = ta.actual_ctxs() 00483 ctx = actxs[-1] 00484 print ctx 00485 insertdates = ta.vdistinct("INSERTDATE") 00486 times = ta.vtimes() 00487 00488 with Rollback(insertdates[-1]): 00489 tsc = ta.tscan( times, ctx, opts ) 00490 00491 def test_payloadless(): 00492 from db import DB 00493 db = DB("tmp_offline_db") 00494 #ta = db.tab('CableMap') 00495 ta = db.tab('PhysAd') 00496 for e in ta.vpayloadless(): 00497 print e 00498 00499 00500 if __name__ == '__main__': 00501 logging.basicConfig(level=logging.DEBUG) 00502 00503 os.environ.setdefault('DBCONF','tmp_offline_db') 00504 00505 from db import DB 00506 db = DB() 00507 ta = db.tab('CalibPmtSpec') 00508 print ta.mma() 00509 00510