/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

asciicat.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 """
00003 Direct python access to the rows of DBI ascii catalog via csv parsing, implemented to
00004 enable pre-insert checks prior to rloadcat of ascii catalogs into the DB
00005 
00006 No need to use the more complex CSV 
00007 
00008 #. as DBI Ascii catalogs are gauranteed valid .csv 
00009 #. have some specifics regards the key names that need special handling
00010 
00011 Ambiguity Avoidance
00012 ~~~~~~~~~~~~~~~~~~~~~
00013 
00014 Need to write ascii cat with new ``INSERTDATE``s 
00015 changed to UTC now of an update, to avoid the **window of ambiguity**  problem :dybsvn:`ticket:844`
00016 
00017 #. first get new ``SEQNO``
00018 
00019 
00020 
00021 """
00022 import os, re, logging
00023 from datetime import datetime
00024 from csv import DictReader
00025 from StringIO import StringIO
00026 from pprint import pformat
00027 from copy import deepcopy
00028 
00029 log = logging.getLogger(__name__)
00030 pathx = lambda _:os.path.abspath(os.path.expanduser(os.path.expandvars(_)))
00031 
00032 
00033 
00034 class AsciiRow(dict):
00035     """
00036     Used internally by AsciiCSV::
00037  
00038          t = AsciiCSV( "~/path/to/catdir") 
00039          r = AsciiRow.Create(d, t ) 
00040          print r.pk    ##  (1,10) 
00041          print r.fv
00042          print r
00043 
00044     """ 
00045     def Create(cls, d, t ):
00046         """
00047         :param d: dict with row content
00048         :param t: parent table AsciiCSV instance
00049         """
00050         r = cls(d)
00051         r.t = t
00052         return r
00053     Create = classmethod(Create)
00054 
00055     def clone(self):
00056         return AsciiRow.Create( dict(self).copy() , self.t )
00057 
00058     def get_pk(self):
00059         if self.t.intpk:
00060             fv = map(lambda k:int(self[k]),self.t.pkfields)
00061         else:
00062             fv = map(lambda k:self[k],self.t.pkfields)
00063         return fv[0] if len(fv) == 1 else tuple(fv)
00064     def set_pk( self, pk ):
00065         if type(pk) == int:pk = tuple([pk])     ## type regularize
00066         assert len(pk) == len(self.t.pkfields), "pk %r is wrong shape expecting %r " % ( pk, self.t.pkfields )
00067         for k,v in zip(self.t.pkfields,pk):
00068             self[k] = str(v)               ## so faked entries match kosher ones hailing from csv files   
00069     pk = property(get_pk, set_pk)
00070     fv = property(lambda self:map(lambda k:self[k],self.t.fields), doc="list of field values in order held by AsciiCSV instance")
00071     def __str__(self):
00072          return ",".join(map(lambda _:"\"%s\""%_[0] if _[1] else _[0], zip(self.fv,self.t.quote))) 
00073 
00074 
00075 class AsciiCSV(dict):
00076     """
00077     AsciiCSV is a dict keyed by PK with values which are dicts keyed by fieldname
00078     which have string values. The PKs and field names are determined from the 
00079     header line of the csv file. For example::
00080 
00081         csv = AsciiCSV( None, "~/dybaux/catalog/tmp_offline_db/CalibPmtSpec/CalibPmtSpec.csv")()
00082               ## standalone use without a catalog
00083 
00084         csv[(50,167)] == {'PMTAFTERPULSE': '0',
00085                           'PMTDARKRATE': '0',
00086                           'PMTDESCRIB': 'DayaBayOWS-wall09-spot23-in',
00087                           'PMTEFFIC': '1',
00088                            ...
00089                           'ROW_COUNTER': '167',
00090                           'SEQNO': '50'}
00091 
00092         csv[(50,167)]['PMTEFFIC'] == '1'
00093 
00094         for pk,v in csv.items():
00095             print pk, v
00096 
00097     Typical PKs:
00098           
00099     #. (SEQNO,ROW_COUNTER) integer tuple for payload tables
00100     #. SEQNO integer for validity tables
00101     #. TABLENAME string for LOCALSEQNO tables 
00102 
00103     """
00104     def __init__(self, cat, path, **kwa ):
00105         dict.__init__(self)
00106         self.cat = cat
00107         self.path = path
00108         self.kwa = kwa
00109         self.tabbase = kwa.get('tdir',None)
00110         self.tabname = kwa.get('tnam',None)
00111         self.pk = kwa.get('tnam',None)
00112         self.seqnos_read = []
00113         self.pks_read = []
00114  
00115     # too memory wasteful apparently 
00116     #pks    = property(lambda self:list(set(map(lambda _:_.pk, self.values() ))), doc="for all tables" )
00117     #allseqno = property(lambda self:list(set(map(lambda _:int(_['SEQNO']), self.values() ))), doc="for payload/validity tables" )
00118 
00119     pks    = property(lambda self:self.pks_read, doc="for all tables" )
00120     allseqno = property(lambda self:self.seqnos_read, doc="for payload/validity tables" )
00121     maxseqno = property(lambda self:max(self.allseqno), doc="for payload/validity tables")
00122     minseqno = property(lambda self:min(self.allseqno), doc="for payload/validity tables")
00123 
00124 
00125     abspath = property(lambda self:self.cat.resolve(self.path))
00126 
00127     pretty = property(lambda self:pformat(dict(self)), doc="pretty print the AsciiCSV dict, only useful for small CSVs such as LOCALSEQNO" )    
00128     lastusedseqno = property(lambda self:dict(map(lambda k:(k,int(self[k]['LASTUSEDSEQNO'])) , self )), doc="for LOCALSEQNO tables" )   
00129     smry = property(lambda self:"\n".join( [self.pretty, pformat(self.lastusedseqno) ]), doc="for LOCALSEQNO tables" )
00130 
00131     orderpk = property(lambda self:sorted(self))
00132     lastpk = property(lambda self:self.orderpk[-1])
00133     last = property(lambda self:self[self.lastpk])
00134 
00135     def __repr__(self):
00136         return "%s [%-5d] %s  " % ( self.__class__.__name__, len(self), self.path  )
00137 
00138     def compare(self, other):
00139         """
00140         Equality operator for dict-of-dict works as would want : 
00141         recursive same keys with same values. As demonstrated::
00142  
00143             old = AsciiCSV( None, "~/dybaux/catalog/tmp_offline_db/CalibPmtSpec/CalibPmtSpec.csv.old")()
00144             new = AsciiCSV( None, "~/dybaux/catalog/tmp_offline_db/CalibPmtSpec/CalibPmtSpec.csv.new")()
00145 
00146             old == new   ## True
00147 
00148             keep = old[(50,167)]['PMTEFFIC'] 
00149             old[(50,167)]['PMTEFFIC'] = keep + "changed"
00150             old == new                                     ## False after changing a single value 
00151 
00152             old[(50,167)]['PMTEFFIC'] = keep
00153             old == new   ## True                           ## back to equality 
00154 
00155             list(old) == list(new)       ## False          ## key order cannot be relied on 
00156             sorted(old) == sorted(new)   ## True
00157             old.orderpk == new.orderpk   ## True
00158 
00159 
00160         The .old which matches dybaux r4968 is from before standardized order was established...
00161  
00162             [blyth@belle7 CalibPmtSpec]$ diff CalibPmtSpec.csv CalibPmtSpec.csv.old
00163             [blyth@belle7 CalibPmtSpec]$ diff CalibPmtSpec.csv CalibPmtSpec.csv.new | wc
00164             2470    4934  222803
00165 
00166         Standardize it by ``old.write()`` after which::
00167 
00168             [blyth@belle7 CalibPmtSpec]$ diff CalibPmtSpec.csv.old  CalibPmtSpec.csv.new
00169             [blyth@belle7 CalibPmtSpec]$ 
00170 
00171         """
00172         return self == other
00173 
00174 
00175     def parse_hdr(self, hdr ):
00176         """
00177         Make sense of hdr line like:: 
00178 
00179            SEQNO int(11),ROW_COUNTER int(11),SENSORID int(11),CHANNELID int(11),PRIMARY KEY (SEQNO,ROW_COUNTER)
00180 
00181         """
00182         fdesc,pkdesc = hdr.split(",PRIMARY KEY ")
00183         assert pkdesc[0] == '(' and pkdesc[-1] == ")"
00184         
00185         self.hdr = hdr 
00186         self.fields  =  map(lambda _:_.split()[0], fdesc.split(",") )
00187         self.dbtypes =  map(lambda _:_.split()[1], fdesc.split(",") )
00188         self.fdict = dict(zip(self.fields, self.dbtypes))
00189         self.pkfields = pkdesc[1:-1].split(",")
00190         self.pktypes =  map(lambda k:self.fdict[k], self.pkfields)
00191         self.intpk = len(filter( lambda k:k.startswith('int'), self.pktypes )) == len(self.pktypes)   ## all integer pk
00192         qtyps = ("char","varc","date",) 
00193         self.quote = map( lambda _:self.fdict[_][0:4] in qtyps, self.fields )    ## the type is quoted
00194 
00195 
00196     def __call__(self, content=None):
00197         return self.read(content)
00198 
00199     def resolve(self, base=None ):
00200         if self.cat:
00201             path = self.cat.resolve( self.path, base )
00202         else:
00203             path = pathx(self.path)
00204         return path
00205 
00206     def read(self, content=None):
00207         """
00208         Parse the csv storing as dict of dict keyed by pk 
00209 
00210         Typical DBI validity and payload tables are ordered by SEQNO/(SEQNO,ROW_COUNTER), but 
00211         the standard order is not followed by:
00212 
00213         #. LOCALSEQNO, not surprising as TABLENAME keyed and follows historic ordering 
00214         #. CalibPmtSpec, CalibPmtSpecVld  **SURPISE : EVIDENCE FOR NON-DBI HANDLING**
00215 
00216         """
00217         if content:
00218             pass
00219         else:
00220             path = self.resolve()
00221             log.debug("parse csv %s " % path ) 
00222             content = open( path , "r" ).read()
00223 
00224         feol = content.find("\n")
00225         self.parse_hdr( content[:feol] )  ## determine fields, pk etc...
00226         rdr = DictReader( StringIO(content[feol+1:]), fieldnames=self.fields )
00227 
00228         sqn=[] 
00229         pks=[]
00230 
00231         for i,d in enumerate(rdr):
00232             r = AsciiRow.Create(d, self )
00233             rpk = r.pk
00234             self[rpk] = r 
00235             pks.append(rpk)
00236 
00237             # collect seqno during filling to avoid memory expense of grabbing latter 
00238             seqno = int(r.get('SEQNO',0))
00239             if seqno:
00240                 sqn.append(seqno)
00241 
00242             assert len(self) == i + 1, ("error non-unique pk %r for row %r " % ( rpk, r ) )  
00243             pass
00244 
00245         self.seqnos_read = list(set(sqn))
00246         self.pks_read = list(set(pks))
00247         return self       
00248 
00249 
00250     def merged(self, update , postpend="" ):
00251         """
00252         Return a new instance that merges `update` into `self`, changes in
00253         the `update` override things in self 
00254        
00255         NB `self` is not changed by this 
00256 
00257         :param update: instance of `AsciiCSV` 
00258         :return merged instance:
00259         :rtype dict of dict:
00260 
00261         """
00262         log.debug("merge csv %r " % update ) 
00263         mrgd = deepcopy(self)
00264         if mrgd.path:
00265             mrgd.path += postpend
00266 
00267         allpk = set(self).union(set(update))
00268         for pk in allpk:
00269             if pk in mrgd and pk in update:
00270                 m = deepcopy(self[pk])
00271                 m.update( update[pk] )      
00272                 log.debug("merged of pk %r combines self %r with update %r yielding %r  " % ( pk, mrgd[pk],update[pk],m) )
00273                 mrgd[pk] = m
00274             elif pk in mrgd:
00275                 log.debug("merged of pk %r keeps entry %r " % ( pk, mrgd[pk] ) )
00276                 pass
00277             elif pk in update:
00278                 log.debug("merged of pk %r adds update entry %r " % ( pk, update[pk] ) )
00279                 mrgd[pk] = deepcopy(update[pk])
00280         return mrgd
00281 
00282     def compare(self, other):
00283         """
00284         Return a dict with the differences  
00285         """
00286         pass
00287 
00288     def content(self):
00289         content = self.hdr + "\n"
00290         for pk in sorted(self):
00291             content += str(self[pk]) + "\n"
00292         return content
00293 
00294     def write(self, base=None):
00295         path = self.resolve( base=base )
00296         if base == None:
00297             log.warn("inplace overwriting %s " % path )        
00298         else:
00299             log.debug("writing to %s " % path )
00300 
00301         wdir = os.path.dirname( path )
00302         if not os.path.exists(wdir):
00303             os.makedirs(wdir)
00304 
00305         fp = open(path, "w")
00306         fp.write( self.content() )
00307         fp.close()
00308 
00309 
00310 
00311 
00312 class AsciiCat(dict):
00313     """
00314     DBI Ascii catalog (a tablename keyed dict holding AsciiCSV representing each table), usage::
00315 
00316         import logging
00317         logging.basicConfig(level=logging.INFO)
00318 
00319         cat = AsciiCat("~/dybaux/catalog/tmp_offline_db")  
00320 
00321         for tn,csv in cat.items():
00322             print tn, csv
00323 
00324         for tn,lastseqno in cat.seqno.items():
00325             print cat[tn+'Vld'][lastseqno]         
00326 
00327         for tn in cat.orderpk:
00328             csv = cat[tn]
00329             print tn, csv.abspath, csv.tabname, csv.tabbase, csv.orderpk[0:100]
00330 
00331 
00332     """
00333     ptn = re.compile("(?P<tdir>[^/]*)/(?P<tnam>\S*)\.csv$")
00334 
00335     def parse_relpath(self, relpath):
00336         m = self.ptn.match(relpath)
00337         assert m, "failed to match %s " % relpath 
00338         d = m.groupdict()
00339         tab = d['tdir']
00340         d['tab'] = tab
00341         if d['tdir'] == d['tnam']:
00342             d['type'] = "pay"
00343         elif "%sVld" % d['tdir'] == d['tnam']:
00344             d['type'] = "vld"
00345         return dict(d, relpath=relpath) 
00346 
00347     showtables = property( lambda self:sorted(self.keys()))
00348     orderpk = property(lambda self:sorted(self))
00349 
00350     @classmethod
00351     def findcat(cls, dir): 
00352         catz = filter(lambda n:n[-4:] == ".cat" , os.listdir(dir) )
00353         assert len(catz) == 1, "must be only one .cat in dir %s " % dir
00354         return os.path.join( dir, catz[0] )
00355 
00356     def __init__(self, dir , skip_pay_check=False , allow_partial=False ):
00357         """
00358         :param dir:
00359         :param skip_pay_check: when True skip loading payload tables in memory 
00360         """
00361         dir = os.path.normpath(pathx(dir))
00362         assert os.path.isdir(dir) , dir
00363         catpath = self.findcat( dir ) 
00364 
00365         log.debug("AsciiCat catpath %s skip_pay_check %s allow_partial %s " % (catpath,skip_pay_check,allow_partial) )
00366 
00367         self.skip_pay_check = skip_pay_check   
00368         self.allow_partial = allow_partial   
00369         self.dir = dir
00370         self.v = {}
00371         self.p = {}
00372         pass
00373         self.read( catpath )
00374 
00375     def compare(self, other):
00376         """
00377         Checking cat comparison, setup old and new catalogs::
00378 
00379             rm -rf ~/dybaux/catalog/tmp_offline_db
00380             svn up -r 4968 ~/dybaux/catalog/tmp_offline_db      ## clean r4968 just prior to ordering standardization
00381             rm -rf ~/tmp_offline_db ; db.py offline_db rdumpcat ~/tmp_offline_db   ## same named catalog with the stanbdard rdumpcat ordering 
00382 
00383         .. code-block:: ipython
00384 
00385                 In [15]: old = AsciiCat("~/dybaux/catalog/tmp_offline_db")
00386                 INFO:__main__:read /home/blyth/dybaux/catalog/tmp_offline_db/tmp_offline_db.cat 
00387                 INFO:__main__:done AsciiCat [13   ] /home/blyth/dybaux/catalog/tmp_offline_db {'CalibPmtSpec': 50, 'FeeCableMap': 3, 'HardwareID': 372, 'CalibFeeSpec': 113, 'CableMap': 460}  
00388 
00389                 In [16]: new = AsciiCat("~/tmp_offline_db")
00390                 INFO:__main__:read /home/blyth/tmp_offline_db/tmp_offline_db.cat 
00391                 INFO:__main__:done AsciiCat [13   ] /home/blyth/tmp_offline_db {'CalibPmtSpec': 50, 'FeeCableMap': 3, 'HardwareID': 372, 'CalibFeeSpec': 113, 'CableMap': 460}  
00392 
00393                 In [17]: old == new
00394                 Out[17]: True
00395 
00396                 In [18]: old['CableMap'][(1,1)]['CHANNELID']
00397                 Out[18]: '16908545'
00398 
00399                 In [19]: old['CableMap'][(1,1)]['CHANNELID'] += "changed"
00400 
00401                 In [20]: old == new
00402                 Out[20]: False
00403 
00404                 In [21]: old['CableMap'][(1,1)]['CHANNELID'] = '16908545'
00405 
00406                 In [22]: old == new
00407                 Out[22]: True
00408 
00409                 In [25]: old.diff(new)
00410                 INFO:__main__:diffing with "diff -r --brief /home/blyth/dybaux/catalog/tmp_offline_db /home/blyth/tmp_offline_db | grep -v .svn "
00411                 INFO:__main__:diff Files /home/blyth/dybaux/catalog/tmp_offline_db/CalibPmtSpec/CalibPmtSpec.csv and /home/blyth/tmp_offline_db/CalibPmtSpec/CalibPmtSpec.csv differ
00412                 INFO:__main__:diff Files /home/blyth/dybaux/catalog/tmp_offline_db/CalibPmtSpec/CalibPmtSpecVld.csv and /home/blyth/tmp_offline_db/CalibPmtSpec/CalibPmtSpecVld.csv differ
00413                 INFO:__main__:diff Files /home/blyth/dybaux/catalog/tmp_offline_db/LOCALSEQNO/LOCALSEQNO.csv and /home/blyth/tmp_offline_db/LOCALSEQNO/LOCALSEQNO.csv differ
00414                 INFO:__main__:diff Files /home/blyth/dybaux/catalog/tmp_offline_db/tmp_offline_db.cat and /home/blyth/tmp_offline_db/tmp_offline_db.cat differ
00415 
00416                 In [27]: old.write()
00417                 WARNING:__main__:overwriting catalog into dir /home/blyth/dybaux/catalog/tmp_offline_db 
00418 
00419                 In [28]: old.diff(new)
00420                 INFO:__main__:diffing with "diff -r --brief /home/blyth/dybaux/catalog/tmp_offline_db /home/blyth/tmp_offline_db | grep -v .svn "
00421 
00422         """
00423         return self == other  
00424 
00425     def __repr__(self):
00426         return "%s [%-5d] %s %r " % ( self.__class__.__name__, len(self), self.dir, self.seqno )
00427 
00428     def read(self, catpath): 
00429         """
00430         Parses Vld and LOCALSEQNO tables by default, reading in the 
00431         csv content into memory.  Only reads payload tables when `skip_pay_check=False`
00432         Reading payload tables for larger tables is slow and memory expensive. 
00433 
00434         """
00435         log.info("read %s " % catpath ) 
00436         self.read_catalog( catpath )
00437 
00438         for tn in self.orderpk:
00439             read = False
00440             if tn[-3:] == 'Vld' or tn == 'LOCALSEQNO':
00441                 read = True 
00442             else:
00443                 read = not self.skip_pay_check
00444             if read:
00445                 log.info("reading table %s " % tn ) 
00446                 self[tn].read()
00447             else:
00448                 log.debug("NOT reading table %s " % tn ) 
00449  
00450         self.read_seqno()
00451         self.check_seqno()
00452         log.info("done %r " % self ) 
00453 
00454     def read_catalog(self, catpath ):
00455         """
00456         Reads ascii catalog into memory.  
00457 
00458         This just reads the small .cat file and sets up lightweight 
00459         AsciiCSV objects without reading in csv content.
00460         """
00461         content = open( catpath , "r" ).read().strip()
00462         relpaths = content.split("\n")
00463         assert relpaths[0] == "name", "unexpected catalog header line %s " % relpaths[0]
00464         for d in map(lambda _:self.parse_relpath(_), relpaths[1:] ):
00465             typ, tab = d.get('type',None), d.get('tab', None)
00466             assert typ and tab 
00467             csv = AsciiCSV(self, d['relpath'], **d )
00468             if typ == 'pay': 
00469                 self.p[tab] = csv   
00470             elif typ == 'vld':
00471                 self.v[tab] = csv
00472             pass
00473             self[csv.pk] = csv   ## keyed by tablename
00474             pass
00475 
00476 
00477 
00478     def fake_write( self, tn , npay=1 ):
00479         """
00480         Fakes a single validity entry and ``npay`` payload entries, by copying the 
00481         last entries in validity and payload tables and with a 
00482         fabricated SEQNO or (SEQNO,ROW_COUNTER) pks.  The `tn` entry in the LOCALSEQNO
00483         table is also changed to mimic a real DBI write.
00484 
00485         :param tn: payload table name
00486         :param npay: number of payload rows to write
00487 
00488         Usage::
00489 
00490             cat = AsciiCat("~/dybaux/catalog/tmp_offline_db")
00491             cat.fake_write('HardwareID', 10 )
00492             cat.write()                       ## CAUTION inplace overwriting
00493 
00494         """
00495         p,v,m = self[tn], self[tn+'Vld'],self['LOCALSEQNO']
00496         assert len(p) > 1 and len(v) > 1 and len(m) > 1, ("cannot fake into empty tables ", len(p),len(v),len(m))
00497 
00498         a = v.last.clone()            ## pk ordered
00499         lastseqno = a.pk              ## get_pk
00500 
00501         l = p.last                    ## l.pk is tuple with SEQNO,ROW_COUNTER values
00502         assert l.pk[0] == lastseqno == int(m[tn]['LASTUSEDSEQNO']) , ( "lastseqno mismatch ", l, l.pk, lastseqno , m[tn] )
00503 
00504         fakeseqno = lastseqno + 1 
00505         a.pk = fakeseqno              ## set_pk     
00506         v[a.pk] = a                   ## fake validity entry 
00507 
00508         for n in range(npay):
00509            b = l.clone()
00510            b.pk = (fakeseqno, n+1 )   ## set_pk 
00511            p[b.pk] = b                ## fake payload entries
00512 
00513         m[tn]['LASTUSEDSEQNO'] = str(fakeseqno)  
00514 
00515         log.info("fake_write %s %s added LASTUSEDSEQNO %s " % ( tn,npay,fakeseqno )) 
00516 
00517         self.clear_cache()
00518 
00519 
00520 
00521     def read_seqno(self, tab="LOCALSEQNO"):
00522         """
00523         Reads the LASTUSEDSEQNO entries from the LOCALSEQNO table into seqno dict
00524         """ 
00525         self._seqno = {}
00526         for pk,row in self[tab].items():
00527             t = row['TABLENAME']
00528             n = int(row['LASTUSEDSEQNO'])
00529             if t == "*":continue
00530             self._seqno[t] = n       
00531         log.debug( "LASTUSEDSEQNO %r FROM %s  " % ( self._seqno, tab)  )
00532     def check_seqno(self):  
00533         """
00534         Checks correspondence between the max SEQNO in the Vld and Pay tables 
00535         with that recorded in the LOCALSEQNO table
00536 
00537         Note that without the very expensive db.py option: `--EXTRA_PAYLOAD_CHECK`
00538         which caused setting of `self.skip_pay_check` only the validity tables are checked. 
00539 
00540         The allseqno called inside the maxseqno property is very expensive, causing 
00541         MemoryError at IHEP
00542         """ 
00543         for t,n in self._seqno.items():
00544             v = self.get(t+'Vld',[])
00545             p = self.get(t,[])
00546             if len(v) > 0:
00547                 vmax = v.maxseqno
00548                 assert vmax == n , ( vmax , n )
00549             if len(p) > 0:
00550                 pmax = p.maxseqno
00551                 assert pmax == n , ( pmax , n )
00552 
00553     def get_seqno(self):
00554         """
00555         Tablename keyed dict of LASTUSEDSEQNO values obtained from LOCALSEQNO table
00556         """ 
00557         if hasattr(self,'_seqno'):
00558             return self._seqno
00559         self.read_seqno()
00560         self.check_seqno()
00561         return self._seqno
00562     seqno = property( get_seqno , doc=get_seqno.__doc__ )
00563 
00564         
00565     def clear_cache(self):
00566         """
00567         delete cached attributes forcing recalc on next access ...
00568         needed after faking for example  
00569         """
00570         if hasattr(self, '_fabseqno' ):
00571             del self._fabseqno
00572         if hasattr(self, '_seqno' ):
00573             del self._seqno
00574         if hasattr(self, '_allseqno' ):
00575             del self._allseqno 
00576 
00577     def read_allseqno(self):
00578         """
00579         Populates the table name keyed `self._allseqno` dict with results
00580         from the `.allseqno` property applied to individual `AsciiCSV` for each Vld table.  
00581 
00582         The `self.seqno` dict is populated by reading the LOCALSEQNO.csv so when the 
00583         tables represented in that do not correspond to those of the catalog. 
00584 
00585         """ 
00586         self._allseqno = {}
00587         for tn,lus in self.seqno.items():
00588             tnv = "%sVld"%tn
00589             csvv = self.get(tnv, None)
00590             if not csvv and self.allow_partial:continue  
00591             assert csvv, "read_allseqno failed to find AsciiCSV for %s : consider use of -P,--ALLOW_PARTIAL option to allow dealing in partial catalogs " % tnv 
00592             self._allseqno[tn] = csvv.allseqno
00593     def check_allseqno(self):
00594         if self.allow_partial:return
00595         assert sorted(self._allseqno.keys()) == sorted(self._seqno.keys()), "seqno keys mismatch "
00596         pass
00597     def get_allseqno(self):
00598         """
00599         Populates, checks and returns the `self._allseqno` dict which is keyed 
00600         on tablename and holds the collected allseqno from the indicdual AsciiCSV
00601         """ 
00602         if hasattr(self,'_allseqno'):
00603             return self._allseqno
00604         self.read_allseqno()
00605         self.check_allseqno()
00606         return self._allseqno
00607     allseqno = property( get_allseqno , doc=get_allseqno.__doc__ )
00608 
00609     def get_fabseqno(self):
00610         """
00611         Summarizes ``cat.allseqno``, by fabricating a dict keyed by table name containing 
00612         the number of Vld SEQNO (from length of values in ``cat.allseqno``) 
00613 
00614         This dict can be compared with ``cat.seqno``, which is obtained from 
00615         the LASTUSEDSEQNO entries in the ``LOCALSEQNO`` table::
00616         Assuming kosher DBI handling of tables this fabricated dict ``cat.fabseqno`` should 
00617         match ``cat.seqno``, meaning that SEQNO start from 1 and have no gaps.
00618 
00619        .. code-block:: ipython          
00620 
00621                 In [1]: from DybPython import AsciiCat
00622 
00623                 In [2]: cat = AsciiCat("~/dybaux/catalog/tmp_offline_db")
00624 
00625                 In [3]: db.seqno   ## queries the LOCALSEQNO table in DB
00626                 Out[3]: 
00627                 {'CableMap': 213,
00628                  'CalibFeeSpec': 113,
00629                  'CalibPmtSpec': 29,
00630                  'FeeCableMap': 3,
00631                  'HardwareID': 172}
00632 
00633                 In [4]: db.fabseqno    ## a summarization of db.allseqno
00634                 Out[4]: 
00635                 {'CableMap': 213,
00636                  'CalibFeeSpec': 111,
00637                  'CalibPmtSpec': 8,
00638                  'FeeCableMap': 3,
00639                  'HardwareID': 172}
00640 
00641                 In [5]: db.miscreants()   ## assertions avoided by miscreant status
00642                 Out[5]: ('CalibPmtSpec', 'CalibFeeSpec')
00643 
00644 
00645         """
00646         if hasattr(self, '_fabseqno' ):
00647             return self._fabseqno
00648         self._fabseqno =  dict(map(lambda(k,v):(k,len(v)),self.allseqno.items()))
00649         return self._fabseqno
00650     fabseqno = property( get_fabseqno, doc=get_fabseqno.__doc__ )
00651 
00652 
00653     def write(self, xdir=None ):
00654         """
00655         :param xdir: write directory 
00656 
00657         Writes catalog into the ``xdir`` directory named after directory basename
00658         """
00659         if xdir==None:
00660             xdir = self.dir
00661             log.warn("overwriting catalog into dir %s " % xdir )
00662         else:
00663             xdir = pathx(xdir)
00664             assert xdir != self.dir
00665         self.write_catalog( base=xdir )
00666         for tn in self.orderpk: 
00667             csv = self[tn]
00668             csv.write(base=xdir)
00669 
00670     def write_catalog(self, base ):
00671         """
00672         writes the .cat file into directory `base` the name of the `.cat` is 
00673         obtained from the basename of `base` 
00674         """
00675         if not os.path.exists(base):
00676             log.info("creating %s " % base ) 
00677             os.makedirs(base)
00678         outpath = os.path.join( base , "%s.cat" % os.path.basename(base) )
00679         log.debug("write_catalog to %s " % outpath )
00680 
00681         fp = open(outpath,"w")
00682         fp.write("\n".join(["name"] + map(lambda tn:self[tn].path, self.orderpk)) + "\n" ) 
00683         fp.close()           
00684 
00685     def resolve(self, relpath , base=None):
00686         """Resolve a relative path wrt the base""" 
00687         if base == None:
00688             base = self.dir 
00689         return os.path.join( base , relpath )
00690 
00691 
00692     def updates(self, target, tselect=[], fastforward=False ):
00693         """
00694         :param target: DBI store (AsciiCat or DB) but usually the target DB to be updated
00695 
00696         Return tablename (payload only) keyed dict listing new SEQNO 
00697         and prepares the ascii catalog for loading into DB, by fastforwarding INSERTDATE to UTC now. 
00698 
00699         """
00700         log.debug("updates tselect %r " % tselect )
00701         upls = self.seqno_updates(target, tselect=tselect)
00702         upas = self.allseqno_updates(target, tselect=tselect )
00703         assert upls.keys() == upas.keys(), ("updates keys mismatch ", upls.keys(), upas.keys() )
00704 
00705         # checking correspondence between LASTUSEDSEQNO and the last of the lists of allseqno
00706         for tn in upas:
00707             assert upls[tn] == upas[tn][-1] , ("seqno vs allseqno mismatch %s" % tn, upls[tn], upas[tn][-1] ) 
00708             #print tn,  upls[tn], upas[tn][-1] , upas[tn][0] 
00709 
00710         if fastforward: 
00711             now = datetime.utcnow()
00712             now = now.strftime("%Y-%m-%d %H:%M:%S")
00713             for tn in upas:
00714                 n = 0 
00715                 vld = self[tn+'Vld']
00716                 for sq in upas[tn]:
00717                     n += 1
00718                     vld[sq]['INSERTDATE'] = now
00719                 if n > 0: 
00720                     log.info("fastforward %s validity rows of %s to %s " % (n,tn,now) )
00721                     vld.write()
00722         else: 
00723             pass
00724         return upas
00725 
00726 
00727     def seqno_updates(self, target , tselect=[] ):
00728         """
00729         :param target: DBI store (AsciiCat or DB) but usually the target DB to be updated
00730         :param tselect: restrict comparisons to payload tables in tselect, if tselect list is populated 
00731 
00732         This operates by comparing LASTUSEDSEQNO entries in the LOCALSEQNO tables, via 
00733         the ``.seqno`` 
00734 
00735         Provides dict keyed by table names of LASTUSEDSEQNO values (payload names only) for 
00736         new or changed LASTUSEDSEQNO entries in the ascii catalog.  
00737 
00738         For example this AsciiCat might contain::
00739 
00740               {'Noodles': 15, 'CalibPmtSpec': 29, 'FeeCableMap': 3, 'HardwareID': 172, 'CalibFeeSpec': 113, 'CableMap': 213}
00741 
00742         Whereas the target DB contains::
00743            
00744               {'Noodles': 10, 'CalibPmtSpec': 29, 'FeeCableMap': 3, 'CalibFeeSpec': 113}  
00745  
00746         This would return::
00747 
00748               {'Noodles':15 ,'HardwareID': 172 ,'CableMap':213}
00749         
00750 
00751         Note the use of SEQNO 0 to mark not-present
00752 
00753 
00754         This way of checking for updates works fine without the payload tables in memory
00755         as it is just using the LASTUSEDSEQNO counts hailing from LOCALSEQNO.csv
00756         BUT make sure are rcmpcat comparing against a propa full DB not the decoupled subset of tables
00757         that tmp_offline_db often is.
00758         """
00759         upls = {}
00760 
00761         
00762         ttabs = filter(lambda t:len(tselect) == 0 or t in tselect, target.seqno )
00763         log.debug("target tables %r " % ttabs )         
00764         missing = list(set(ttabs).difference(set(self.seqno)))
00765         assert len(missing) == 0, "tables in target that are not in ascii catalog %r \n try restricting table selection with -t option " % missing
00766         
00767         log.debug("seqno_updates self.seqno %r " % self.seqno )         
00768         log.debug("seqno_updates target.seqno %r " % target.seqno )         
00769 
00770         alldbi = list(set(self.seqno).union(set(target.seqno)))
00771         for tn in alldbi:
00772             if len(tselect) > 0 and tn not in tselect:
00773                 continue
00774             acls = self.seqno.get(tn,0)     ## ascii catalog LASTUSEDSEQNO for table
00775             tgls = target.seqno.get(tn,0)   ## LASTUSEDSEQNO in target db 
00776             #log.info("seqno_updates acls %s tgls %s " % (acls,tgls) )
00777             assert acls >= tgls, ("ERROR LASTUSEDSEQNO in target exceeds that in ascii cat %s " % tn, acls, tgls)
00778             if acls > tgls:
00779                 upls[tn] = acls
00780             else:
00781                 pass  
00782         log.info("seqno_updates : ascii catalog LASTUSEDSEQNO changes relative to target : %r " % upls )
00783         return upls
00784 
00785     def allseqno_updates(self, target, tselect=[] ):
00786         """
00787         :param target: DBI store (AsciiCat or DB) but usually the target DB to be updated
00788         
00789         This operates by comparing all SEQNO in the tables
00790 
00791         """ 
00792         upas = {}
00793         alldbi = list(set(self.allseqno).union(set(target.allseqno)))
00794         for tn in alldbi:
00795             if len(tselect) > 0 and tn not in tselect:
00796                 continue
00797             acsq = set(self.allseqno.get(tn,[]))         
00798             tgsq = set(target.allseqno.get(tn,[]))      
00799             #log.info("allseqno_updates tn %s acsq %s tgsq %s " % (tn,acsq,tgsq))
00800             assert len(tgsq.difference(acsq)) == 0 , "ERROR SEQNO in db that are not present in ascii catalog table %s  " % tn
00801             upsq = sorted(list(acsq.difference(tgsq)))     ## SEQNO in ascii catalog but not target DB
00802             if len(upsq) > 0:
00803                 upas[tn] = upsq 
00804             else:
00805                 pass
00806         for tn,seq in upas.items():
00807             contiguous = seq[-1] - seq[0] + 1  == len(seq) 
00808             if not contiguous:
00809                 msg = "non contiguous SEQNO in tn %s %s %s %s " % ( tn,  seq[0], seq[-1], len(seq)  )
00810                 if tn in ('CalibPmtSpec',):    ## WHY NOT NORMALLY TICKLED ?
00811                     log.info(msg)
00812                 else:
00813                     log.fatal(msg)       
00814                     assert contiguous, msg 
00815 
00816         #log.debug("allseqno_updates %r " % upas )
00817         return upas
00818 
00819 
00820     def diff(self, otherdir):
00821         if type(otherdir) == AsciiCat:
00822             otherdir = otherdir.dir  
00823         cmd = "diff -r --brief %s %s | grep -v .svn " % ( self.dir, otherdir )
00824         log.info("diffing with \"%s\"" % cmd )
00825         for line in os.popen(cmd).readlines():
00826             log.info("diff %s" % line.strip())
00827 
00828 
00829 class DD(dict):
00830     """
00831     dict comparison
00832     """
00833     def __init__(self, a_, b_ , **kwa ):
00834         dict.__init__(self, **kwa ) 
00835         a, b = set(a_), set(b_)
00836         c = a.intersection(b)
00837         self['added']   = b - c
00838         self['removed'] = a - c
00839         self['changed'] = set(o for o in c if a_[o] != b_[o])
00840         self['unchanged'] = set(o for o in c if a_[o] == b_[o])
00841         ## specialized
00842         if kwa.get('increments'):
00843             self['allincrements'] = dict( (o,b_[o]-a_[o]) for o in c )
00844             self['increments'] = dict( (o,b_[o]-a_[o]) for o in c if a_[o] != b_[o]  )
00845 
00846     def __str__(self):
00847         return pformat(self) 
00848 
00849 
00850 
00851 def validate_csv_update( tn , old_ , new_ ):
00852     """
00853     Standalone comparison of parsed .csv   
00854 
00855     :param tn: table name
00856     :param old_: path to starting csv 
00857     :param new_: path to changed csv 
00858 
00859     Hmm when operating decoupled the new LOCALSEQNO will 
00860 
00861     """
00862     if tn in ('LOCALSEQNO','GLOBALSEQNO',):
00863         old = AsciiCSV(None, old_)
00864         old.read()
00865         log.info( "oldcsv %r %r " % ( old, dict(old) ) )
00866         new = AsciiCSV(None, new_)
00867         new.read()
00868         log.info( "newcsv %r %r " % ( new, dict(new) ) )
00869         pass
00870         assert len(new) >= len(old) , "LOCALSEQNO cannot shrink "  
00871  
00872 
00873 
00874 def test_diddle():
00875     cat = AsciiCat("~/dybaux/catalog/tmp_offline_db")
00876     diddle = False
00877     if diddle:
00878         now = datetime.utcnow()
00879         for pk,row in cat['CableMapVld'].items():
00880             row['INSERTDATE'] = now.strftime("%Y-%m-%d %H:%M:%S")
00881     #print cat.allseqno
00882     print cat.fabseqno
00883     #cat.write( "/tmp/demo" )  
00884     #cat.diff( "/tmp/demo" )
00885 
00886 
00887 def test_merge():
00888 
00889     old_ = """TABLENAME char(64),LASTUSEDSEQNO int(11),PRIMARY KEY (TABLENAME)
00890 "*",0
00891 "CableMap",440
00892 "CalibFeeSpec",113
00893 "CalibPmtSpec",29
00894 "FeeCableMap",3
00895 "HardwareID",358
00896 """
00897 
00898     new_ = """TABLENAME char(64),LASTUSEDSEQNO int(11),PRIMARY KEY (TABLENAME)
00899 "*",0
00900 "CableMap",1440
00901 "HardwareID",1358
00902 "Lettuce",100
00903 "Tomato",200
00904 "CalibFeeZiggy",300
00905 """
00906 
00907     old = AsciiCSV( None, None )( old_ )
00908     print "old", old.lastusedseqno
00909     print "old content " + "*" * 100 
00910     print old.content()
00911 
00912     new = AsciiCSV( None, None )( new_ )
00913     print "new", new.lastusedseqno
00914     print "new content " + "*" * 100 
00915     print new.content()
00916 
00917     mrga = old.merged( new )
00918     print "mrga ", mrga.lastusedseqno
00919     print "mrga content " + "*" * 100 
00920     print mrga.content()
00921 
00922     mrgb = new.merged( old )    
00923     print "mrgb ", mrgb.lastusedseqno
00924     print "mrgb content " + "*" * 100 
00925     print mrgb.content()
00926 
00927     ## merge direction matters ... in the CableMap and HardwareID values 
00928 
00929     om = DD( old.lastusedseqno, mrga.lastusedseqno , name="om", increments=True )
00930     print om 
00931 
00932 
00933 
00934 def test_fake_write():
00935     cat = AsciiCat("~/dybaux/catalog/tmp_offline_db")
00936     cat.fake_write('HardwareID', 10 )
00937     cat.write()
00938 
00939 
00940 def test_compare_csv():
00941     old = AsciiCSV( None, "~/dybaux/catalog/tmp_offline_db/CalibPmtSpec/CalibPmtSpec.csv.old")()
00942     new = AsciiCSV( None, "~/dybaux/catalog/tmp_offline_db/CalibPmtSpec/CalibPmtSpec.csv.new")()
00943     assert old == new   ## True
00944     ## old.write()     standard-orderized 
00945 
00946 
00947 def test_update_relative_to_target():
00948     from DybPython import DB
00949     target = DB("tmp_offline_db")
00950     cat = AsciiCat("~/dbicopy/tmp_offline_db", skip_pay_check=True, allow_partial=True)
00951     tselect = [tn]
00952 
00953     log.info("   cat.seqno=%r" % cat.seqno)
00954     log.info("target.seqno=%r" % target.seqno)
00955     log.info("   cat.allseqno=%r" % cat.allseqno)
00956     #log.info("target.allseqno=%r" % target.allseqno)
00957 
00958     updates = cat.updates( target , tselect=tselect , fastforward=False )   
00959     print "asciicat updates relative to target %s " % updates
00960     
00961 
00962 
00963 
00964 catd = "~/dbicopy/tmp_offline_db"
00965 tmpd = "/tmp/tdbicopy"
00966 
00967 def test_copy_cat():
00968     """
00969     #. scrubs any pre-existing tmpd 
00970     #. copies asciicat into tmpd
00971     #. checks duplication
00972     """
00973     import shutil
00974     cat = AsciiCat(catd, skip_pay_check=True, allow_partial=True)
00975     if os.path.isdir(tmpd):
00976         log.info("removing %s " % tmpd )
00977         shutil.rmtree(tmpd)
00978     pass
00979     tn = 'PhysAd'     # conveniently small table
00980     cat[tn]()         # reads in payload CSV  
00981     cat.write(tmpd)   # info file system Ascii Catalog
00982 
00983     tmp = AsciiCat(tmpd, skip_pay_check=True, allow_partial=True)
00984     tmp[tn]()         # reads in payload CSV  
00985     assert cat == tmp 
00986 
00987 def test_fake_write():
00988     """
00989     Requires test_copy_cat run before to create sacrificial AsciiCat to diddle with 
00990 
00991     #. fake write in memory 
00992     #. inplace write in filesystem tmpd
00993     """
00994     pass
00995     tmp = AsciiCat(tmpd, skip_pay_check=True, allow_partial=True)
00996     tn = 'PhysAd'
00997     tmp[tn]()         # reads in payload CSV  
00998     tmp.fake_write( tn , npay=1 )
00999     tmp.write()
01000     for tn,csv in tmp.items():
01001         log.info("csv %s %s " % ( tn, csv ))
01002         for pk,v in csv.items():
01003             log.info("pk %s v %s " % ( pk, v ))
01004 
01005 def test_fake_updates():
01006     """
01007     #. for updates call the target slot (argument 1) `cat` is required to be at/behind base `tmp` slot (argument 0)
01008     """
01009     cat = AsciiCat(catd, skip_pay_check=True, allow_partial=True)
01010     tmp = AsciiCat(tmpd, skip_pay_check=True, allow_partial=True)
01011     tn = 'PhysAd'
01012     updates = tmp.updates( cat , tselect=[tn] , fastforward=False )   
01013     print updates
01014 
01015 def test_dump_cat():
01016     cat = AsciiCat(basd, skip_pay_check=True, allow_partial=True)
01017     log.info("dumping cat %r " % cat )
01018     for tn,csv in cat.items():
01019         log.info("csv %s %s " % ( tn, csv ))
01020         for pk,v in csv.items():
01021             log.info("pk %s v %s " % ( pk, v ))
01022 
01023 
01024 def test_partial_rloadcat():
01025     """
01026     TODO:
01027 
01028     #. make re-testable : by controlling DB status (use fixture control?), 
01029     #. currently does nothing after 1st usage as detects nothing to do  
01030     """
01031     tn = 'PhysAd'
01032     _tmpd = tmpd
01033     cmd = "db.py --noconfirm --ALLOW_PARTIAL -t %(tn)s tmp_offline_db rloadcat %(_tmpd)s " % locals()   
01034     print cmd
01035 
01036 
01037 if __name__ == '__main__':
01038 
01039     logging.basicConfig(level=logging.INFO)
01040     #test_dump_cat()
01041     test_copy_cat()
01042     test_fake_write()           
01043     test_fake_updates()
01044     test_partial_rloadcat()
01045 
01046 
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 09:55:40 for DybPython by doxygen 1.7.4