/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

tab.py
Go to the documentation of this file.
00001 import os, logging, pickle, datetime, hashlib
00002 from pprint import pformat
00003 log = logging.getLogger(__name__)
00004 
00005 class DebugValidityOrdering(object):
00006     """
00007 
00008     .. warn:: **FOR DEBUGGING ONLY**  NB the change impacts the kls specified only 
00009 
00010     Context manger for non standard DBI Validity ordering queries, allowing what happens 
00011     within a tied VERSIONDATE occurs to be examined.
00012     Observation suggests MySQL implicity orders by the PK(SEQNO) asc 
00013     within tied VERSIONDATE sets.
00014 
00015     The extra ordering string is impinged between "order by VERSIONDATE desc" 
00016     and the terminating ";" with a "," prefix when the extra is non-blank. 
00017 
00018     Usage::
00019  
00020          from DybPython.tab import DebugValidityOrdering
00021          from DybDbi import GDemo
00022 
00023          with DebugValidityOrdering(GDemo,"SEQNO desc"):
00024              r = GDemo.Rpt()
00025              n = len(r)
00026              print n, r.vrec.seqno 
00027 
00028     """  
00029     def __init__(self, kls , tmp  ):
00030         self.tmp = tmp
00031         self.proxy = kls.GetTableProxy().GetDBProxy()
00032         self.prior = None
00033     def __enter__( self ):
00034         if not self.tmp:
00035             self.prior = None
00036             log.warn("extra ordering of None, leaving ordering as is")
00037         else: 
00038             self.prior = self.proxy.GetExtraOrdering()
00039             self.proxy.SetExtraOrdering( self.tmp )       
00040             log.info("changing extra ordering : %s -> %s " % ( self.prior, self.tmp ) )
00041 
00042     def __exit__(self, type, value, tb ):
00043         if not self.prior:
00044             log.warn("no prior set ")
00045             return None
00046         self.proxy.SetExtraOrdering(self.prior)
00047         log.info("reverting extra ordering : %s -> %s " % ( self.prior, self.extra) )
00048 
00049 
00050 class Rollback(object):
00051     """
00052     INSERTDATE rollback context manager
00053     """
00054     def __init__(self, insertdate ):
00055         self.insertdate = insertdate
00056 
00057     def config(self, insertdate):
00058         from DybDbi import TimeStamp, gDbi
00059         delta = datetime.timedelta( seconds=1 )    ## is this really needed ? YES as rollback uses "INSERTDATE < t"
00060         ts = TimeStamp.fromAssumedUTCDatetime( insertdate + delta )
00061         gDbi.ConfigRollback("* = %s" % ts.AsString("s") )
00062         return ts
00063 
00064     def __enter__(self):
00065         if self.insertdate != None:
00066             self.config(self.insertdate) 
00067     def __exit__(self, type, value, tb ):
00068         from DybDbi import gDbi
00069         gDbi.ClearRollbackDates()
00070 
00071 
00072 def t_(_):
00073     """
00074     Convert a datetime into a TimeStamp
00075     """
00076     from DybDbi import TimeStamp
00077     return TimeStamp.fromAssumedUTCDatetime(_)
00078 
00079 def rst_title( msg , cha="~" ):
00080     return  "%s\n%s\n\n" % ( msg, cha * len(msg) )
00081 
00082 class ctxdict(dict):
00083     """
00084     Specialization of `dict` to make hashable allowing to be used in `sets` and other dicts 
00085 
00086     .. warning:: do **not** mutate these dicts if you wish to maintain sanity 
00087  
00088     """
00089     def __hash__(self):
00090         return hash(tuple(sorted(self.items())))
00091     def __str__(self):
00092         return "_".join(map(lambda k:"%s%s" % (k,self[k]) , sorted(self) )) 
00093 
00094 
00095 class Tab(object):
00096     """
00097     Marriage of raw DB access via MySQL-python and DybDbi access 
00098     """
00099     def __init__(self, name , db ):
00100         """
00101         :param name: name of DBI payload table
00102         :param db:  :py:class:`DybPython.DB` instance
00103         """
00104         self.name = name
00105         self.db   = db
00106     def q(self, sql):
00107         return self.db( sql % {'tab':self.name} )
00108     def column(self, n, sql):
00109         return [d[n] for d in self.db( sql % {'tab':self.name} )]
00110     def vdistinct(self, n ):
00111         return self.column(n, "select distinct("+n+") from %(tab)sVld")
00112 
00113     def fields(self, skips=[]):
00114         """
00115         Return list of fields in the this table, excluding the skipped fields.
00116         """ 
00117         return [_['Field'] for _ in self.db("describe %s" % self.name  ) if _['Field'] not in skips ]
00118 
00119     def _qminmaxavg(self, skips=('SEQNO',), group_by='SEQNO' ):
00120         """
00121         Return SQL query string that provides min, max and avg values of each field within `group_by` groups 
00122         """
00123         fields = self.fields(skips=skips) 
00124         mma = lambda f:"min(%(f)s) as mi%(f)s, max(%(f)s) as mx%(f)s, avg(%(f)s) as av%(f)s" % dict(f=f)
00125         sel = ",".join(map(mma,fields))
00126         return "select %(group_by)s,%(sel)s from %(tab)s group by %(group_by)s" % dict(tab=self.name, group_by=group_by, sel=sel)
00127 
00128     def qminmaxavg( self):
00129         return self.db( self._qminmaxavg() )
00130 
00131     def mma(self):
00132         """
00133         Return a string rst table showing min/max/avg values grouped by SEQNO.
00134         When the docs virtual python is used, a valid rst table is returned otherwise
00135         a pale imitation table is provided.
00136 
00137         Note that min/max/avg are consolidated into a single cell to avoid overlarge tables
00138         """
00139         flds = self.fields(skips=('SEQNO',))
00140         from DybPython.vlut import TabularData
00141         td = TabularData()
00142         for e in self.qminmaxavg():
00143             d = dict(SEQNO=e['SEQNO'])
00144             for f in flds:
00145                 d[f] = "%s %s %s" % ( e['mi'+f], e['mx'+f], e['av'+f ] )
00146             td.append(d)
00147         return td.as_rst( ['SEQNO'] + flds )
00148 
00149 
00150     def dropzero(self):
00151         """
00152         DROPs the DBI payload and validity tables and sets the LASTUSEDSEQNO in LOCALSEQNO table to zero.
00153         Only works when the DBCONF section name and actual database name starts with 'tmp_' 
00154         """
00155         tab = self.name
00156         dbname = self.db.dbc['database']
00157         if dbname.startswith('tmp_') and self.db.sect.startswith('tmp_'):
00158             log.info("dropping DBI table pair for %(tab)s and zeroing LOCALSEQNO.LASTUSEDSEQNO "  % locals() ) 
00159             self.db("drop table if exists %(tab)s, %(tab)sVld" % locals() )
00160             self.db("update LOCALSEQNO set LASTUSEDSEQNO=0 where TABLENAME='%(tab)s'" % locals() )
00161         else:
00162             raise Exception("dropzero only permitted when DBCONF section name and actual database name starts with tmp_ %(tab)s" % locals() ) 
00163 
00164     def vld(self, *seqnos):
00165         seqs = "(%s)" % seqnos[0] if len(seqnos)==1 else str(tuple(seqnos))
00166         return self.q("select * from %(tab)sVld where SEQNO in " + seqs ) 
00167 
00168     def vpayloadless(self):
00169         """
00170         Return validity entries with no corresponding payload
00171 
00172         There are legitimate reasons to go payloadless, however it is also a likely outcome of
00173         some error conditions when writing to the DB.
00174         """
00175         tab = self.name
00176         return self.q("select * from  %(tab)sVld where SEQNO not in (select distinct(SEQNO) from %(tab)s) order by SEQNO " % locals() )
00177 
00178     def vorder(self):
00179          return 'SEQNO TIMESTART TIMEEND SITEMASK SIMMASK SUBSITE TASK AGGREGATENO VERSIONDATE INSERTDATE'.split()
00180 
00181     def vctxsql(self, ctx):
00182          """
00183          Provide context SQL query excluding time requirement for interactive mysql querying.  Usage::
00184 
00185               tab = db.tab('Demo')
00186               actx = tab.actual_ctxs()
00187               sql = tab.vctxsql(actx[0])
00188                   > 'select * from DemoVld where SiteMask & 127 and SimMask & 1 and  Task = 0 and SubSite = 0 order by VERSIONDATE desc'
00189 
00190          """
00191          d = dict(tab=self.name)
00192          d.update(ctx)
00193          return "select * from %(tab)sVld where SiteMask & %(site)s and SimMask & %(simflag)s and  Task = %(task)s and SubSite = %(subsite)s order by VERSIONDATE desc" % d 
00194 
00195     def vctx(self, ctx):
00196         return self.db( self.vctxsql( ctx)) 
00197 
00198     def dbictx_directive(self, ctx ):
00199         """
00200         Provides rst source like::
00201 
00202           select * from Demo ... 
00203           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00204 
00205           .. dbictx:: tmp_offline_db Demo
00206                :site:
00207                :simflag:
00208                :task:
00209                :subsite:
00210 
00211         Which directs Sphinx to perform a faux-DBI context query when the documentation is built and present the result
00212         as a table within the docs.
00213 
00214         """
00215         vl += rst_title(str(self.vctxsql(ctx)))
00216         vl += '.. dbictx:: %(dbconf)s %(tn)s\n   :site: %(site)s\n   :simflag: %(simflag)s\n   :task: %(task)s\n   :subsite: %(subsite)s\n' % dict(ctx, dbconf=self.db.sect ,tn=self.name)
00217         return vl
00218 
00219     def vlast(self, n , orderlimit="order by SEQNO desc limit 1" ):
00220         """
00221         :param n: name of row in validity table, eg `INSERTDATE`
00222         :param orderlimit: SQL ordering and limit clause 
00223         """
00224         vals = self.column(n, "select `"+n+"` from %(tab)sVld " + orderlimit )
00225         return vals[0] if len(vals) == 1 else None
00226 
00227     def vtimes(self):
00228         return sorted(list(set( self.vdistinct("TIMESTART") + self.vdistinct("TIMEEND") )))
00229 
00230     def vseqnos(self, *seqnos, **kwa):
00231         """
00232         Returns string presentation of a list of SEQNO in a DBI validity table
00233 
00234         :param seqnos:  one or more SEQNO 
00235 
00236         Usage::
00237 
00238                 In [6]: p tab.vseqnos(10,1)
00239                 +-------+---------------------+---------------------+----------+---------+---------+------+-------------+---------------------+---------------------+
00240                 | SEQNO | TIMESTART           | TIMEEND             | SITEMASK | SIMMASK | SUBSITE | TASK | AGGREGATENO | VERSIONDATE         | INSERTDATE          |
00241                 +-------+---------------------+---------------------+----------+---------+---------+------+-------------+---------------------+---------------------+
00242                 |     1 | 2010-01-01 01:00:00 | 2038-01-19 03:14:07 |      127 |       1 |       0 |    0 |          -1 | 2010-01-01 01:00:00 | 2011-08-22 12:14:21 | 
00243                 |    10 | 2010-01-01 04:00:00 | 2038-01-19 03:14:07 |      127 |       1 |       0 |    0 |          -1 | 2010-01-01 04:02:00 | 2011-08-22 12:14:30 | 
00244                 +-------+---------------------+---------------------+----------+---------+---------+------+-------------+---------------------+---------------------+
00245 
00246         """
00247         raw = kwa.pop('raw',False)
00248         if raw:
00249             seqtuple = "(%s)" % seqnos[0] if len(seqnos)==1 else str(tuple(seqnos))
00250             return self.db.mysql("select * from %(tab)sVld where SEQNO in %(seqtuple)s" % {'tab':self.name,'seqtuple':seqtuple} , opts="-t" )
00251         else:
00252             return self.vld(*seqnos)      
00253 
00254 
00255 
00256     def actual_ctxs(self):
00257         """
00258         Provide list of distinct context dicts actually present in the Vld table,
00259         ordered by the string representation.
00260 
00261         Masks and specials kAnyTask = -1, kAnySubSite = -1 are not yet handled differently 
00262         
00263         :return: list of dicts such as ``dict(site=127,simflag=1,subsite=1,task=0, aggno=-1)`` covering all contexts in  Vld table
00264 
00265         """
00266         actual = []
00267         for e in self.q("select * from %(tab)sVld"):
00268             ctx = ctxdict(task=int(e['TASK']),simflag=int(e['SIMMASK']),site=int(e['SITEMASK']),subsite=int(e['SUBSITE']),aggno=int(e['AGGREGATENO']))
00269             actual.append(ctx)
00270         return sorted(list(set(actual)),cmp=lambda a,b:cmp(str(a),str(b)))   
00271 
00272     def looped_ctxs(self):
00273         """
00274         Looped context dicts based on distinct values of the components.
00275 
00276         :return: list of dicts such as ``dict(site=127,simflag=1,subsite=1,task=0, aggno=-1)`` covering all relevant contexts suggested by Vld table
00277 
00278         ::
00279                 +-------+---------------------+---------------------+----------+---------+---------+------+-------------+---------------------+---------------------+
00280                 | SEQNO | TIMESTART           | TIMEEND             | SITEMASK | SIMMASK | SUBSITE | TASK | AGGREGATENO | VERSIONDATE         | INSERTDATE          |
00281                 +-------+---------------------+---------------------+----------+---------+---------+------+-------------+---------------------+---------------------+
00282 
00283         """
00284         sites    = self.vdistinct("SITEMASK")  
00285         simflags = self.vdistinct("SIMMASK")  
00286         subsites = self.vdistinct("SUBSITE")  
00287         tasks    = self.vdistinct("TASK")  
00288         aggnos   = self.vdistinct("AGGREGATENO")  
00289         
00290         l = [] 
00291         for task in map(int,tasks):
00292             for simflag in map(int,simflags):  
00293                 for site in map(int,sites):
00294                     for subsite in map(int,subsites):
00295                         for aggno in map(int,aggnos):
00296                             l.append(ctxdict(task=task,simflag=simflag,site=site,subsite=subsite,aggno=aggno))
00297         return l
00298 
00299 
00300     exists = property(lambda self:self.db.has_table(self.name), doc="True if DBI payload table exists")
00301 
00302     path = property(lambda self:"%s.txt" % self.name)
00303     def save(self, smth, dir=None ):
00304         """
00305         :param smth: some string related to the table to be written
00306         """
00307         path = os.path.join(dir, self.path) if dir else self.path
00308         f = open( path , "w")
00309         f.write( smth )
00310         f.close()
00311        
00312     def _kls(self):
00313         import DybDbi
00314         return getattr( DybDbi , 'G' + self.name )  
00315     kls = property( _kls )
00316 
00317 
00318     def tscan(self, times, ctx, opts):
00319         """
00320         :param times: list of datetimes at which to perform DBI queries 
00321         :param ctx: DybDbi ctx kwargs, excluding `timestamp` 
00322         :param opts: dict of extra options
00323 
00324         :return lod: list-of-dicts representing winning SEQNO vs T
00325 
00326         Also for each time an extra FindSeqNoWithVersionDate query is done 
00327         in order to explicity show where the ambiguous rot sets in on the VLUT.  
00328         The excludingSeqno is used to avoid finding itself
00329 
00330         BUT FindSeqNoWithVersionDate will return 0 if it finds more than 1 ...
00331         maybe need to allow to pass a string
00332 
00333 
00334         When using the default timeGate the view of collisions seen by FindSeqNoWithVersionDate
00335         should correspond to the standard DBI view 
00336 
00337 
00338         """
00339         kls = self.kls
00340         tsc = []
00341 
00342         dbno = 0         
00343         uniqueVersionDateSiteMask = 0 
00344         uniqueVersionDateSimMask = 0   ## 0 means use the vrec ones
00345         useTimeGate = 0                ## 0 means NO, 1 use default, >15 use value as non-standard gate 
00346 
00347         ordering = opts.get("ordering",None) 
00348 
00349         for t in times:
00350             ts = t_(t)
00351             log.debug("tscan t %s ts %s ordering %s " % ( t, ts.AsString("s"), ordering ) )
00352             kls.Cache().Purge()
00353 
00354             with DebugValidityOrdering(kls,ordering):
00355 
00356                 r = kls.Rpt().Clone()
00357                 r.ctx( timestamp=ts, **ctx )
00358                 n = len(r)
00359 
00360                 nvr = r.key.numvrecs
00361                 seq = r.key.seqnolist
00362                 dig = r.digest             ## untested with aggregates 
00363 
00364                 log.debug("payload count %s  nvr %s seq %s dig %s  " % (n, nvr,seq,dig ) )
00365                 
00366                 d = dict(t=t,n=n,dig=dig)
00367 
00368                 ##  aggregate Table queries yield r.vrec.seqno all over the shop depending on extra ordering, but it appears not to matter 
00369                 ## ... to allow VLUT comparisons to work for aggregate tables exposed the `key.seqnolist`  
00370                 ##
00371                 if nvr > 1:     ## aggregate query       
00372                     seq = r.key.seqnolist
00373                     d.update( rvs=seq ) 
00374                     ## unclear how to do collision test in aggregate case
00375                 else:
00376                     #assert r.vrec.seqno == r.key.seqnolist, (r.vrec.seqno , r.key.seqnolist )
00377                     rvs = r.vrec.seqno   
00378                     rvs_versiondate = r.vrec.versiondate  
00379                     excludingSeqno = rvs
00380                     count = True
00381                     ncollision = proxy.FindSeqNoWithVersionDate( rvs_versiondate , r.vrec , dbno ,uniqueVersionDateSiteMask, uniqueVersionDateSimMask, excludingSeqno, useTimeGate, count )
00382                     if ncollision==1:
00383                         count = False
00384                         collision = proxy.FindSeqNoWithVersionDate( rvs_versiondate , r.vrec , dbno ,uniqueVersionDateSiteMask, uniqueVersionDateSimMask, excludingSeqno, useTimeGate, count )
00385                     else:
00386                         collision = -1 
00387                     pass
00388                     d.update( rvs=rvs,collision=collision,ncollision=ncollision) 
00389                     pass
00390                 tsc.append( d )
00391 
00392         return tsc 
00393 
00394     def vscan(self, digest=False, timestart=False):
00395         """
00396         Technical check ``(SEQNO vs SEQNO)`` 
00397         Uses genDbi class for the table to perform DBI query for all SEQNO
00398         :return lod: list-of-dicts 
00399 
00400         Formerly was doing ``rvs = r.vrec.seqno if n>0 else 0`` but 
00401         ``n=0`` will be OK once orphans are allowed again
00402 
00403         """
00404         kls = self.kls
00405         r = kls.Rpt()
00406         vsc = []
00407         for e in self.db("select * from %(tab)sVld order by SEQNO asc" % {'tab':self.name} ):
00408             ts = t_( e['TIMESTART'] )
00409             r.ctx( timestamp=ts, site=e['SITEMASK'], simflag=e['SIMMASK'], subsite=e['SUBSITE'], task=e['TASK'] )
00410             n = len(r)
00411             rvs = r.vrec.seqno 
00412             d = dict(seqno=e['SEQNO'], rvs=rvs, n=n )
00413             if digest:
00414                 d['digest'] = r.digest 
00415             if timestart:
00416                 d['timestart'] = e['TIMESTART'] 
00417             vsc.append( d )
00418         return vsc
00419 
00420 
00421     def fullscan(self, ctx, opts ):
00422         """
00423         Performs SEQNO vs T scans for full history of a table,
00424         via rollbacks to all distinct INSERTDATES (+1s)        
00425 
00426         NB raw DB selects are blind to the rollback 
00427 
00428         :param ctx: DybDbi ctx kwargs, excluding `timestamp`       
00429         :param opts: dict with any extra variations
00430 
00431         :return dld: INSERTDATE keyed dict-of-list-of-dicts 
00432         """ 
00433         times = self.vtimes()     ## raw DB select is blind to the DBI rollback
00434         times = times[:-1]        ## kludgy exclusion of EOT
00435         fscan = {}
00436 
00437         insertdate_aligned = opts['_aligned'] 
00438         insertdates_ = sorted(self.vdistinct("INSERTDATE")) 
00439         if insertdate_aligned: 
00440             insertdates = zip(insertdates_,insertdates_)
00441         else:
00442             insertdates =  zip(["latest"],[insertdates_[-1]])
00443 
00444         for insertlabel,insertdate in insertdates:
00445             with Rollback(insertdate):
00446                 tsc = self.tscan(times,  ctx, opts )
00447                 log.debug("fullscan %s" % pformat(tsc))
00448                 fscan[insertlabel] = tsc
00449         return fscan    
00450 
00451 
00452 def test_ctx():
00453     from db import DB
00454     db = DB("tmp_offline_db")
00455     for tab in db.showpaytables:
00456         ta = db.tab(tab)
00457 
00458         lastinsert = ta.vlast('INSERTDATE')
00459         lctxs = ta.looped_ctxs()
00460         actxs = ta.actual_ctxs()
00461 
00462         log.info("tab %-20s lastinsert %-20s :  %-4s(%-4s) looped(actual) ctx " % (tab, lastinsert, len(lctxs),len(actxs)) )
00463         for ctx in lctxs:
00464             log.debug("lctx %s " % ctx)
00465         for ctx in actxs:
00466             log.debug("actx %s " % ctx)
00467     
00468 
00469 def test_scan():
00470     from db import DB
00471     db = DB("tmp_offline_db")
00472     for tab in db.showpaytables:
00473         ta = db.tab(tab)
00474         actxs = ta.actual_ctxs()
00475         for ctx in actxs:
00476             ta.fullscan(ctx)
00477 
00478 def test_rollback():
00479     from db import DB
00480     ta = db.tab('CableMap')
00481     opts = {}
00482     actxs = ta.actual_ctxs()
00483     ctx = actxs[-1]
00484     print ctx
00485     insertdates = ta.vdistinct("INSERTDATE")
00486     times = ta.vtimes()
00487 
00488     with Rollback(insertdates[-1]):
00489         tsc = ta.tscan( times, ctx, opts ) 
00490 
00491 def test_payloadless():
00492     from db import DB
00493     db = DB("tmp_offline_db")
00494     #ta = db.tab('CableMap')
00495     ta = db.tab('PhysAd')
00496     for e in ta.vpayloadless():
00497         print e
00498 
00499 
00500 if __name__ == '__main__':
00501     logging.basicConfig(level=logging.DEBUG)
00502 
00503     os.environ.setdefault('DBCONF','tmp_offline_db')
00504 
00505     from db import DB
00506     db = DB()
00507     ta = db.tab('CalibPmtSpec')
00508     print ta.mma()
00509 
00510 
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 09:55:40 for DybPython by doxygen 1.7.4