/search.css" rel="stylesheet" type="text/css"/> /search.js">
00001 #!/usr/bin/env python 00002 """ 00003 Direct python access to the rows of DBI ascii catalog via csv parsing, implemented to 00004 enable pre-insert checks prior to rloadcat of ascii catalogs into the DB 00005 00006 No need to use the more complex CSV 00007 00008 #. as DBI Ascii catalogs are gauranteed valid .csv 00009 #. have some specifics regards the key names that need special handling 00010 00011 Ambiguity Avoidance 00012 ~~~~~~~~~~~~~~~~~~~~~ 00013 00014 Need to write ascii cat with new ``INSERTDATE``s 00015 changed to UTC now of an update, to avoid the **window of ambiguity** problem :dybsvn:`ticket:844` 00016 00017 #. first get new ``SEQNO`` 00018 00019 00020 00021 """ 00022 import os, re, logging 00023 from datetime import datetime 00024 from csv import DictReader 00025 from StringIO import StringIO 00026 from pprint import pformat 00027 from copy import deepcopy 00028 00029 log = logging.getLogger(__name__) 00030 pathx = lambda _:os.path.abspath(os.path.expanduser(os.path.expandvars(_))) 00031 00032 00033 00034 class AsciiRow(dict): 00035 """ 00036 Used internally by AsciiCSV:: 00037 00038 t = AsciiCSV( "~/path/to/catdir") 00039 r = AsciiRow.Create(d, t ) 00040 print r.pk ## (1,10) 00041 print r.fv 00042 print r 00043 00044 """ 00045 def Create(cls, d, t ): 00046 """ 00047 :param d: dict with row content 00048 :param t: parent table AsciiCSV instance 00049 """ 00050 r = cls(d) 00051 r.t = t 00052 return r 00053 Create = classmethod(Create) 00054 00055 def clone(self): 00056 return AsciiRow.Create( dict(self).copy() , self.t ) 00057 00058 def get_pk(self): 00059 if self.t.intpk: 00060 fv = map(lambda k:int(self[k]),self.t.pkfields) 00061 else: 00062 fv = map(lambda k:self[k],self.t.pkfields) 00063 return fv[0] if len(fv) == 1 else tuple(fv) 00064 def set_pk( self, pk ): 00065 if type(pk) == int:pk = tuple([pk]) ## type regularize 00066 assert len(pk) == len(self.t.pkfields), "pk %r is wrong shape expecting %r " % ( pk, self.t.pkfields ) 00067 for k,v in zip(self.t.pkfields,pk): 00068 self[k] = str(v) ## so faked entries match kosher ones hailing from csv files 00069 pk = property(get_pk, set_pk) 00070 fv = property(lambda self:map(lambda k:self[k],self.t.fields), doc="list of field values in order held by AsciiCSV instance") 00071 def __str__(self): 00072 return ",".join(map(lambda _:"\"%s\""%_[0] if _[1] else _[0], zip(self.fv,self.t.quote))) 00073 00074 00075 class AsciiCSV(dict): 00076 """ 00077 AsciiCSV is a dict keyed by PK with values which are dicts keyed by fieldname 00078 which have string values. The PKs and field names are determined from the 00079 header line of the csv file. For example:: 00080 00081 csv = AsciiCSV( None, "~/dybaux/catalog/tmp_offline_db/CalibPmtSpec/CalibPmtSpec.csv")() 00082 ## standalone use without a catalog 00083 00084 csv[(50,167)] == {'PMTAFTERPULSE': '0', 00085 'PMTDARKRATE': '0', 00086 'PMTDESCRIB': 'DayaBayOWS-wall09-spot23-in', 00087 'PMTEFFIC': '1', 00088 ... 00089 'ROW_COUNTER': '167', 00090 'SEQNO': '50'} 00091 00092 csv[(50,167)]['PMTEFFIC'] == '1' 00093 00094 for pk,v in csv.items(): 00095 print pk, v 00096 00097 Typical PKs: 00098 00099 #. (SEQNO,ROW_COUNTER) integer tuple for payload tables 00100 #. SEQNO integer for validity tables 00101 #. TABLENAME string for LOCALSEQNO tables 00102 00103 """ 00104 def __init__(self, cat, path, **kwa ): 00105 dict.__init__(self) 00106 self.cat = cat 00107 self.path = path 00108 self.kwa = kwa 00109 self.tabbase = kwa.get('tdir',None) 00110 self.tabname = kwa.get('tnam',None) 00111 self.pk = kwa.get('tnam',None) 00112 self.seqnos_read = [] 00113 self.pks_read = [] 00114 00115 # too memory wasteful apparently 00116 #pks = property(lambda self:list(set(map(lambda _:_.pk, self.values() ))), doc="for all tables" ) 00117 #allseqno = property(lambda self:list(set(map(lambda _:int(_['SEQNO']), self.values() ))), doc="for payload/validity tables" ) 00118 00119 pks = property(lambda self:self.pks_read, doc="for all tables" ) 00120 allseqno = property(lambda self:self.seqnos_read, doc="for payload/validity tables" ) 00121 maxseqno = property(lambda self:max(self.allseqno), doc="for payload/validity tables") 00122 minseqno = property(lambda self:min(self.allseqno), doc="for payload/validity tables") 00123 00124 00125 abspath = property(lambda self:self.cat.resolve(self.path)) 00126 00127 pretty = property(lambda self:pformat(dict(self)), doc="pretty print the AsciiCSV dict, only useful for small CSVs such as LOCALSEQNO" ) 00128 lastusedseqno = property(lambda self:dict(map(lambda k:(k,int(self[k]['LASTUSEDSEQNO'])) , self )), doc="for LOCALSEQNO tables" ) 00129 smry = property(lambda self:"\n".join( [self.pretty, pformat(self.lastusedseqno) ]), doc="for LOCALSEQNO tables" ) 00130 00131 orderpk = property(lambda self:sorted(self)) 00132 lastpk = property(lambda self:self.orderpk[-1]) 00133 last = property(lambda self:self[self.lastpk]) 00134 00135 def __repr__(self): 00136 return "%s [%-5d] %s " % ( self.__class__.__name__, len(self), self.path ) 00137 00138 def compare(self, other): 00139 """ 00140 Equality operator for dict-of-dict works as would want : 00141 recursive same keys with same values. As demonstrated:: 00142 00143 old = AsciiCSV( None, "~/dybaux/catalog/tmp_offline_db/CalibPmtSpec/CalibPmtSpec.csv.old")() 00144 new = AsciiCSV( None, "~/dybaux/catalog/tmp_offline_db/CalibPmtSpec/CalibPmtSpec.csv.new")() 00145 00146 old == new ## True 00147 00148 keep = old[(50,167)]['PMTEFFIC'] 00149 old[(50,167)]['PMTEFFIC'] = keep + "changed" 00150 old == new ## False after changing a single value 00151 00152 old[(50,167)]['PMTEFFIC'] = keep 00153 old == new ## True ## back to equality 00154 00155 list(old) == list(new) ## False ## key order cannot be relied on 00156 sorted(old) == sorted(new) ## True 00157 old.orderpk == new.orderpk ## True 00158 00159 00160 The .old which matches dybaux r4968 is from before standardized order was established... 00161 00162 [blyth@belle7 CalibPmtSpec]$ diff CalibPmtSpec.csv CalibPmtSpec.csv.old 00163 [blyth@belle7 CalibPmtSpec]$ diff CalibPmtSpec.csv CalibPmtSpec.csv.new | wc 00164 2470 4934 222803 00165 00166 Standardize it by ``old.write()`` after which:: 00167 00168 [blyth@belle7 CalibPmtSpec]$ diff CalibPmtSpec.csv.old CalibPmtSpec.csv.new 00169 [blyth@belle7 CalibPmtSpec]$ 00170 00171 """ 00172 return self == other 00173 00174 00175 def parse_hdr(self, hdr ): 00176 """ 00177 Make sense of hdr line like:: 00178 00179 SEQNO int(11),ROW_COUNTER int(11),SENSORID int(11),CHANNELID int(11),PRIMARY KEY (SEQNO,ROW_COUNTER) 00180 00181 """ 00182 fdesc,pkdesc = hdr.split(",PRIMARY KEY ") 00183 assert pkdesc[0] == '(' and pkdesc[-1] == ")" 00184 00185 self.hdr = hdr 00186 self.fields = map(lambda _:_.split()[0], fdesc.split(",") ) 00187 self.dbtypes = map(lambda _:_.split()[1], fdesc.split(",") ) 00188 self.fdict = dict(zip(self.fields, self.dbtypes)) 00189 self.pkfields = pkdesc[1:-1].split(",") 00190 self.pktypes = map(lambda k:self.fdict[k], self.pkfields) 00191 self.intpk = len(filter( lambda k:k.startswith('int'), self.pktypes )) == len(self.pktypes) ## all integer pk 00192 qtyps = ("char","varc","date",) 00193 self.quote = map( lambda _:self.fdict[_][0:4] in qtyps, self.fields ) ## the type is quoted 00194 00195 00196 def __call__(self, content=None): 00197 return self.read(content) 00198 00199 def resolve(self, base=None ): 00200 if self.cat: 00201 path = self.cat.resolve( self.path, base ) 00202 else: 00203 path = pathx(self.path) 00204 return path 00205 00206 def read(self, content=None): 00207 """ 00208 Parse the csv storing as dict of dict keyed by pk 00209 00210 Typical DBI validity and payload tables are ordered by SEQNO/(SEQNO,ROW_COUNTER), but 00211 the standard order is not followed by: 00212 00213 #. LOCALSEQNO, not surprising as TABLENAME keyed and follows historic ordering 00214 #. CalibPmtSpec, CalibPmtSpecVld **SURPISE : EVIDENCE FOR NON-DBI HANDLING** 00215 00216 """ 00217 if content: 00218 pass 00219 else: 00220 path = self.resolve() 00221 log.debug("parse csv %s " % path ) 00222 content = open( path , "r" ).read() 00223 00224 feol = content.find("\n") 00225 self.parse_hdr( content[:feol] ) ## determine fields, pk etc... 00226 rdr = DictReader( StringIO(content[feol+1:]), fieldnames=self.fields ) 00227 00228 sqn=[] 00229 pks=[] 00230 00231 for i,d in enumerate(rdr): 00232 r = AsciiRow.Create(d, self ) 00233 rpk = r.pk 00234 self[rpk] = r 00235 pks.append(rpk) 00236 00237 # collect seqno during filling to avoid memory expense of grabbing latter 00238 seqno = int(r.get('SEQNO',0)) 00239 if seqno: 00240 sqn.append(seqno) 00241 00242 assert len(self) == i + 1, ("error non-unique pk %r for row %r " % ( rpk, r ) ) 00243 pass 00244 00245 self.seqnos_read = list(set(sqn)) 00246 self.pks_read = list(set(pks)) 00247 return self 00248 00249 00250 def merged(self, update , postpend="" ): 00251 """ 00252 Return a new instance that merges `update` into `self`, changes in 00253 the `update` override things in self 00254 00255 NB `self` is not changed by this 00256 00257 :param update: instance of `AsciiCSV` 00258 :return merged instance: 00259 :rtype dict of dict: 00260 00261 """ 00262 log.debug("merge csv %r " % update ) 00263 mrgd = deepcopy(self) 00264 if mrgd.path: 00265 mrgd.path += postpend 00266 00267 allpk = set(self).union(set(update)) 00268 for pk in allpk: 00269 if pk in mrgd and pk in update: 00270 m = deepcopy(self[pk]) 00271 m.update( update[pk] ) 00272 log.debug("merged of pk %r combines self %r with update %r yielding %r " % ( pk, mrgd[pk],update[pk],m) ) 00273 mrgd[pk] = m 00274 elif pk in mrgd: 00275 log.debug("merged of pk %r keeps entry %r " % ( pk, mrgd[pk] ) ) 00276 pass 00277 elif pk in update: 00278 log.debug("merged of pk %r adds update entry %r " % ( pk, update[pk] ) ) 00279 mrgd[pk] = deepcopy(update[pk]) 00280 return mrgd 00281 00282 def compare(self, other): 00283 """ 00284 Return a dict with the differences 00285 """ 00286 pass 00287 00288 def content(self): 00289 content = self.hdr + "\n" 00290 for pk in sorted(self): 00291 content += str(self[pk]) + "\n" 00292 return content 00293 00294 def write(self, base=None): 00295 path = self.resolve( base=base ) 00296 if base == None: 00297 log.warn("inplace overwriting %s " % path ) 00298 else: 00299 log.debug("writing to %s " % path ) 00300 00301 wdir = os.path.dirname( path ) 00302 if not os.path.exists(wdir): 00303 os.makedirs(wdir) 00304 00305 fp = open(path, "w") 00306 fp.write( self.content() ) 00307 fp.close() 00308 00309 00310 00311 00312 class AsciiCat(dict): 00313 """ 00314 DBI Ascii catalog (a tablename keyed dict holding AsciiCSV representing each table), usage:: 00315 00316 import logging 00317 logging.basicConfig(level=logging.INFO) 00318 00319 cat = AsciiCat("~/dybaux/catalog/tmp_offline_db") 00320 00321 for tn,csv in cat.items(): 00322 print tn, csv 00323 00324 for tn,lastseqno in cat.seqno.items(): 00325 print cat[tn+'Vld'][lastseqno] 00326 00327 for tn in cat.orderpk: 00328 csv = cat[tn] 00329 print tn, csv.abspath, csv.tabname, csv.tabbase, csv.orderpk[0:100] 00330 00331 00332 """ 00333 ptn = re.compile("(?P<tdir>[^/]*)/(?P<tnam>\S*)\.csv$") 00334 00335 def parse_relpath(self, relpath): 00336 m = self.ptn.match(relpath) 00337 assert m, "failed to match %s " % relpath 00338 d = m.groupdict() 00339 tab = d['tdir'] 00340 d['tab'] = tab 00341 if d['tdir'] == d['tnam']: 00342 d['type'] = "pay" 00343 elif "%sVld" % d['tdir'] == d['tnam']: 00344 d['type'] = "vld" 00345 return dict(d, relpath=relpath) 00346 00347 showtables = property( lambda self:sorted(self.keys())) 00348 orderpk = property(lambda self:sorted(self)) 00349 00350 @classmethod 00351 def findcat(cls, dir): 00352 catz = filter(lambda n:n[-4:] == ".cat" , os.listdir(dir) ) 00353 assert len(catz) == 1, "must be only one .cat in dir %s " % dir 00354 return os.path.join( dir, catz[0] ) 00355 00356 def __init__(self, dir , skip_pay_check=False , allow_partial=False ): 00357 """ 00358 :param dir: 00359 :param skip_pay_check: when True skip loading payload tables in memory 00360 """ 00361 dir = os.path.normpath(pathx(dir)) 00362 assert os.path.isdir(dir) , dir 00363 catpath = self.findcat( dir ) 00364 00365 log.debug("AsciiCat catpath %s skip_pay_check %s allow_partial %s " % (catpath,skip_pay_check,allow_partial) ) 00366 00367 self.skip_pay_check = skip_pay_check 00368 self.allow_partial = allow_partial 00369 self.dir = dir 00370 self.v = {} 00371 self.p = {} 00372 pass 00373 self.read( catpath ) 00374 00375 def compare(self, other): 00376 """ 00377 Checking cat comparison, setup old and new catalogs:: 00378 00379 rm -rf ~/dybaux/catalog/tmp_offline_db 00380 svn up -r 4968 ~/dybaux/catalog/tmp_offline_db ## clean r4968 just prior to ordering standardization 00381 rm -rf ~/tmp_offline_db ; db.py offline_db rdumpcat ~/tmp_offline_db ## same named catalog with the stanbdard rdumpcat ordering 00382 00383 .. code-block:: ipython 00384 00385 In [15]: old = AsciiCat("~/dybaux/catalog/tmp_offline_db") 00386 INFO:__main__:read /home/blyth/dybaux/catalog/tmp_offline_db/tmp_offline_db.cat 00387 INFO:__main__:done AsciiCat [13 ] /home/blyth/dybaux/catalog/tmp_offline_db {'CalibPmtSpec': 50, 'FeeCableMap': 3, 'HardwareID': 372, 'CalibFeeSpec': 113, 'CableMap': 460} 00388 00389 In [16]: new = AsciiCat("~/tmp_offline_db") 00390 INFO:__main__:read /home/blyth/tmp_offline_db/tmp_offline_db.cat 00391 INFO:__main__:done AsciiCat [13 ] /home/blyth/tmp_offline_db {'CalibPmtSpec': 50, 'FeeCableMap': 3, 'HardwareID': 372, 'CalibFeeSpec': 113, 'CableMap': 460} 00392 00393 In [17]: old == new 00394 Out[17]: True 00395 00396 In [18]: old['CableMap'][(1,1)]['CHANNELID'] 00397 Out[18]: '16908545' 00398 00399 In [19]: old['CableMap'][(1,1)]['CHANNELID'] += "changed" 00400 00401 In [20]: old == new 00402 Out[20]: False 00403 00404 In [21]: old['CableMap'][(1,1)]['CHANNELID'] = '16908545' 00405 00406 In [22]: old == new 00407 Out[22]: True 00408 00409 In [25]: old.diff(new) 00410 INFO:__main__:diffing with "diff -r --brief /home/blyth/dybaux/catalog/tmp_offline_db /home/blyth/tmp_offline_db | grep -v .svn " 00411 INFO:__main__:diff Files /home/blyth/dybaux/catalog/tmp_offline_db/CalibPmtSpec/CalibPmtSpec.csv and /home/blyth/tmp_offline_db/CalibPmtSpec/CalibPmtSpec.csv differ 00412 INFO:__main__:diff Files /home/blyth/dybaux/catalog/tmp_offline_db/CalibPmtSpec/CalibPmtSpecVld.csv and /home/blyth/tmp_offline_db/CalibPmtSpec/CalibPmtSpecVld.csv differ 00413 INFO:__main__:diff Files /home/blyth/dybaux/catalog/tmp_offline_db/LOCALSEQNO/LOCALSEQNO.csv and /home/blyth/tmp_offline_db/LOCALSEQNO/LOCALSEQNO.csv differ 00414 INFO:__main__:diff Files /home/blyth/dybaux/catalog/tmp_offline_db/tmp_offline_db.cat and /home/blyth/tmp_offline_db/tmp_offline_db.cat differ 00415 00416 In [27]: old.write() 00417 WARNING:__main__:overwriting catalog into dir /home/blyth/dybaux/catalog/tmp_offline_db 00418 00419 In [28]: old.diff(new) 00420 INFO:__main__:diffing with "diff -r --brief /home/blyth/dybaux/catalog/tmp_offline_db /home/blyth/tmp_offline_db | grep -v .svn " 00421 00422 """ 00423 return self == other 00424 00425 def __repr__(self): 00426 return "%s [%-5d] %s %r " % ( self.__class__.__name__, len(self), self.dir, self.seqno ) 00427 00428 def read(self, catpath): 00429 """ 00430 Parses Vld and LOCALSEQNO tables by default, reading in the 00431 csv content into memory. Only reads payload tables when `skip_pay_check=False` 00432 Reading payload tables for larger tables is slow and memory expensive. 00433 00434 """ 00435 log.info("read %s " % catpath ) 00436 self.read_catalog( catpath ) 00437 00438 for tn in self.orderpk: 00439 read = False 00440 if tn[-3:] == 'Vld' or tn == 'LOCALSEQNO': 00441 read = True 00442 else: 00443 read = not self.skip_pay_check 00444 if read: 00445 log.info("reading table %s " % tn ) 00446 self[tn].read() 00447 else: 00448 log.debug("NOT reading table %s " % tn ) 00449 00450 self.read_seqno() 00451 self.check_seqno() 00452 log.info("done %r " % self ) 00453 00454 def read_catalog(self, catpath ): 00455 """ 00456 Reads ascii catalog into memory. 00457 00458 This just reads the small .cat file and sets up lightweight 00459 AsciiCSV objects without reading in csv content. 00460 """ 00461 content = open( catpath , "r" ).read().strip() 00462 relpaths = content.split("\n") 00463 assert relpaths[0] == "name", "unexpected catalog header line %s " % relpaths[0] 00464 for d in map(lambda _:self.parse_relpath(_), relpaths[1:] ): 00465 typ, tab = d.get('type',None), d.get('tab', None) 00466 assert typ and tab 00467 csv = AsciiCSV(self, d['relpath'], **d ) 00468 if typ == 'pay': 00469 self.p[tab] = csv 00470 elif typ == 'vld': 00471 self.v[tab] = csv 00472 pass 00473 self[csv.pk] = csv ## keyed by tablename 00474 pass 00475 00476 00477 00478 def fake_write( self, tn , npay=1 ): 00479 """ 00480 Fakes a single validity entry and ``npay`` payload entries, by copying the 00481 last entries in validity and payload tables and with a 00482 fabricated SEQNO or (SEQNO,ROW_COUNTER) pks. The `tn` entry in the LOCALSEQNO 00483 table is also changed to mimic a real DBI write. 00484 00485 :param tn: payload table name 00486 :param npay: number of payload rows to write 00487 00488 Usage:: 00489 00490 cat = AsciiCat("~/dybaux/catalog/tmp_offline_db") 00491 cat.fake_write('HardwareID', 10 ) 00492 cat.write() ## CAUTION inplace overwriting 00493 00494 """ 00495 p,v,m = self[tn], self[tn+'Vld'],self['LOCALSEQNO'] 00496 assert len(p) > 1 and len(v) > 1 and len(m) > 1, ("cannot fake into empty tables ", len(p),len(v),len(m)) 00497 00498 a = v.last.clone() ## pk ordered 00499 lastseqno = a.pk ## get_pk 00500 00501 l = p.last ## l.pk is tuple with SEQNO,ROW_COUNTER values 00502 assert l.pk[0] == lastseqno == int(m[tn]['LASTUSEDSEQNO']) , ( "lastseqno mismatch ", l, l.pk, lastseqno , m[tn] ) 00503 00504 fakeseqno = lastseqno + 1 00505 a.pk = fakeseqno ## set_pk 00506 v[a.pk] = a ## fake validity entry 00507 00508 for n in range(npay): 00509 b = l.clone() 00510 b.pk = (fakeseqno, n+1 ) ## set_pk 00511 p[b.pk] = b ## fake payload entries 00512 00513 m[tn]['LASTUSEDSEQNO'] = str(fakeseqno) 00514 00515 log.info("fake_write %s %s added LASTUSEDSEQNO %s " % ( tn,npay,fakeseqno )) 00516 00517 self.clear_cache() 00518 00519 00520 00521 def read_seqno(self, tab="LOCALSEQNO"): 00522 """ 00523 Reads the LASTUSEDSEQNO entries from the LOCALSEQNO table into seqno dict 00524 """ 00525 self._seqno = {} 00526 for pk,row in self[tab].items(): 00527 t = row['TABLENAME'] 00528 n = int(row['LASTUSEDSEQNO']) 00529 if t == "*":continue 00530 self._seqno[t] = n 00531 log.debug( "LASTUSEDSEQNO %r FROM %s " % ( self._seqno, tab) ) 00532 def check_seqno(self): 00533 """ 00534 Checks correspondence between the max SEQNO in the Vld and Pay tables 00535 with that recorded in the LOCALSEQNO table 00536 00537 Note that without the very expensive db.py option: `--EXTRA_PAYLOAD_CHECK` 00538 which caused setting of `self.skip_pay_check` only the validity tables are checked. 00539 00540 The allseqno called inside the maxseqno property is very expensive, causing 00541 MemoryError at IHEP 00542 """ 00543 for t,n in self._seqno.items(): 00544 v = self.get(t+'Vld',[]) 00545 p = self.get(t,[]) 00546 if len(v) > 0: 00547 vmax = v.maxseqno 00548 assert vmax == n , ( vmax , n ) 00549 if len(p) > 0: 00550 pmax = p.maxseqno 00551 assert pmax == n , ( pmax , n ) 00552 00553 def get_seqno(self): 00554 """ 00555 Tablename keyed dict of LASTUSEDSEQNO values obtained from LOCALSEQNO table 00556 """ 00557 if hasattr(self,'_seqno'): 00558 return self._seqno 00559 self.read_seqno() 00560 self.check_seqno() 00561 return self._seqno 00562 seqno = property( get_seqno , doc=get_seqno.__doc__ ) 00563 00564 00565 def clear_cache(self): 00566 """ 00567 delete cached attributes forcing recalc on next access ... 00568 needed after faking for example 00569 """ 00570 if hasattr(self, '_fabseqno' ): 00571 del self._fabseqno 00572 if hasattr(self, '_seqno' ): 00573 del self._seqno 00574 if hasattr(self, '_allseqno' ): 00575 del self._allseqno 00576 00577 def read_allseqno(self): 00578 """ 00579 Populates the table name keyed `self._allseqno` dict with results 00580 from the `.allseqno` property applied to individual `AsciiCSV` for each Vld table. 00581 00582 The `self.seqno` dict is populated by reading the LOCALSEQNO.csv so when the 00583 tables represented in that do not correspond to those of the catalog. 00584 00585 """ 00586 self._allseqno = {} 00587 for tn,lus in self.seqno.items(): 00588 tnv = "%sVld"%tn 00589 csvv = self.get(tnv, None) 00590 if not csvv and self.allow_partial:continue 00591 assert csvv, "read_allseqno failed to find AsciiCSV for %s : consider use of -P,--ALLOW_PARTIAL option to allow dealing in partial catalogs " % tnv 00592 self._allseqno[tn] = csvv.allseqno 00593 def check_allseqno(self): 00594 if self.allow_partial:return 00595 assert sorted(self._allseqno.keys()) == sorted(self._seqno.keys()), "seqno keys mismatch " 00596 pass 00597 def get_allseqno(self): 00598 """ 00599 Populates, checks and returns the `self._allseqno` dict which is keyed 00600 on tablename and holds the collected allseqno from the indicdual AsciiCSV 00601 """ 00602 if hasattr(self,'_allseqno'): 00603 return self._allseqno 00604 self.read_allseqno() 00605 self.check_allseqno() 00606 return self._allseqno 00607 allseqno = property( get_allseqno , doc=get_allseqno.__doc__ ) 00608 00609 def get_fabseqno(self): 00610 """ 00611 Summarizes ``cat.allseqno``, by fabricating a dict keyed by table name containing 00612 the number of Vld SEQNO (from length of values in ``cat.allseqno``) 00613 00614 This dict can be compared with ``cat.seqno``, which is obtained from 00615 the LASTUSEDSEQNO entries in the ``LOCALSEQNO`` table:: 00616 Assuming kosher DBI handling of tables this fabricated dict ``cat.fabseqno`` should 00617 match ``cat.seqno``, meaning that SEQNO start from 1 and have no gaps. 00618 00619 .. code-block:: ipython 00620 00621 In [1]: from DybPython import AsciiCat 00622 00623 In [2]: cat = AsciiCat("~/dybaux/catalog/tmp_offline_db") 00624 00625 In [3]: db.seqno ## queries the LOCALSEQNO table in DB 00626 Out[3]: 00627 {'CableMap': 213, 00628 'CalibFeeSpec': 113, 00629 'CalibPmtSpec': 29, 00630 'FeeCableMap': 3, 00631 'HardwareID': 172} 00632 00633 In [4]: db.fabseqno ## a summarization of db.allseqno 00634 Out[4]: 00635 {'CableMap': 213, 00636 'CalibFeeSpec': 111, 00637 'CalibPmtSpec': 8, 00638 'FeeCableMap': 3, 00639 'HardwareID': 172} 00640 00641 In [5]: db.miscreants() ## assertions avoided by miscreant status 00642 Out[5]: ('CalibPmtSpec', 'CalibFeeSpec') 00643 00644 00645 """ 00646 if hasattr(self, '_fabseqno' ): 00647 return self._fabseqno 00648 self._fabseqno = dict(map(lambda(k,v):(k,len(v)),self.allseqno.items())) 00649 return self._fabseqno 00650 fabseqno = property( get_fabseqno, doc=get_fabseqno.__doc__ ) 00651 00652 00653 def write(self, xdir=None ): 00654 """ 00655 :param xdir: write directory 00656 00657 Writes catalog into the ``xdir`` directory named after directory basename 00658 """ 00659 if xdir==None: 00660 xdir = self.dir 00661 log.warn("overwriting catalog into dir %s " % xdir ) 00662 else: 00663 xdir = pathx(xdir) 00664 assert xdir != self.dir 00665 self.write_catalog( base=xdir ) 00666 for tn in self.orderpk: 00667 csv = self[tn] 00668 csv.write(base=xdir) 00669 00670 def write_catalog(self, base ): 00671 """ 00672 writes the .cat file into directory `base` the name of the `.cat` is 00673 obtained from the basename of `base` 00674 """ 00675 if not os.path.exists(base): 00676 log.info("creating %s " % base ) 00677 os.makedirs(base) 00678 outpath = os.path.join( base , "%s.cat" % os.path.basename(base) ) 00679 log.debug("write_catalog to %s " % outpath ) 00680 00681 fp = open(outpath,"w") 00682 fp.write("\n".join(["name"] + map(lambda tn:self[tn].path, self.orderpk)) + "\n" ) 00683 fp.close() 00684 00685 def resolve(self, relpath , base=None): 00686 """Resolve a relative path wrt the base""" 00687 if base == None: 00688 base = self.dir 00689 return os.path.join( base , relpath ) 00690 00691 00692 def updates(self, target, tselect=[], fastforward=False ): 00693 """ 00694 :param target: DBI store (AsciiCat or DB) but usually the target DB to be updated 00695 00696 Return tablename (payload only) keyed dict listing new SEQNO 00697 and prepares the ascii catalog for loading into DB, by fastforwarding INSERTDATE to UTC now. 00698 00699 """ 00700 log.debug("updates tselect %r " % tselect ) 00701 upls = self.seqno_updates(target, tselect=tselect) 00702 upas = self.allseqno_updates(target, tselect=tselect ) 00703 assert upls.keys() == upas.keys(), ("updates keys mismatch ", upls.keys(), upas.keys() ) 00704 00705 # checking correspondence between LASTUSEDSEQNO and the last of the lists of allseqno 00706 for tn in upas: 00707 assert upls[tn] == upas[tn][-1] , ("seqno vs allseqno mismatch %s" % tn, upls[tn], upas[tn][-1] ) 00708 #print tn, upls[tn], upas[tn][-1] , upas[tn][0] 00709 00710 if fastforward: 00711 now = datetime.utcnow() 00712 now = now.strftime("%Y-%m-%d %H:%M:%S") 00713 for tn in upas: 00714 n = 0 00715 vld = self[tn+'Vld'] 00716 for sq in upas[tn]: 00717 n += 1 00718 vld[sq]['INSERTDATE'] = now 00719 if n > 0: 00720 log.info("fastforward %s validity rows of %s to %s " % (n,tn,now) ) 00721 vld.write() 00722 else: 00723 pass 00724 return upas 00725 00726 00727 def seqno_updates(self, target , tselect=[] ): 00728 """ 00729 :param target: DBI store (AsciiCat or DB) but usually the target DB to be updated 00730 :param tselect: restrict comparisons to payload tables in tselect, if tselect list is populated 00731 00732 This operates by comparing LASTUSEDSEQNO entries in the LOCALSEQNO tables, via 00733 the ``.seqno`` 00734 00735 Provides dict keyed by table names of LASTUSEDSEQNO values (payload names only) for 00736 new or changed LASTUSEDSEQNO entries in the ascii catalog. 00737 00738 For example this AsciiCat might contain:: 00739 00740 {'Noodles': 15, 'CalibPmtSpec': 29, 'FeeCableMap': 3, 'HardwareID': 172, 'CalibFeeSpec': 113, 'CableMap': 213} 00741 00742 Whereas the target DB contains:: 00743 00744 {'Noodles': 10, 'CalibPmtSpec': 29, 'FeeCableMap': 3, 'CalibFeeSpec': 113} 00745 00746 This would return:: 00747 00748 {'Noodles':15 ,'HardwareID': 172 ,'CableMap':213} 00749 00750 00751 Note the use of SEQNO 0 to mark not-present 00752 00753 00754 This way of checking for updates works fine without the payload tables in memory 00755 as it is just using the LASTUSEDSEQNO counts hailing from LOCALSEQNO.csv 00756 BUT make sure are rcmpcat comparing against a propa full DB not the decoupled subset of tables 00757 that tmp_offline_db often is. 00758 """ 00759 upls = {} 00760 00761 00762 ttabs = filter(lambda t:len(tselect) == 0 or t in tselect, target.seqno ) 00763 log.debug("target tables %r " % ttabs ) 00764 missing = list(set(ttabs).difference(set(self.seqno))) 00765 assert len(missing) == 0, "tables in target that are not in ascii catalog %r \n try restricting table selection with -t option " % missing 00766 00767 log.debug("seqno_updates self.seqno %r " % self.seqno ) 00768 log.debug("seqno_updates target.seqno %r " % target.seqno ) 00769 00770 alldbi = list(set(self.seqno).union(set(target.seqno))) 00771 for tn in alldbi: 00772 if len(tselect) > 0 and tn not in tselect: 00773 continue 00774 acls = self.seqno.get(tn,0) ## ascii catalog LASTUSEDSEQNO for table 00775 tgls = target.seqno.get(tn,0) ## LASTUSEDSEQNO in target db 00776 #log.info("seqno_updates acls %s tgls %s " % (acls,tgls) ) 00777 assert acls >= tgls, ("ERROR LASTUSEDSEQNO in target exceeds that in ascii cat %s " % tn, acls, tgls) 00778 if acls > tgls: 00779 upls[tn] = acls 00780 else: 00781 pass 00782 log.info("seqno_updates : ascii catalog LASTUSEDSEQNO changes relative to target : %r " % upls ) 00783 return upls 00784 00785 def allseqno_updates(self, target, tselect=[] ): 00786 """ 00787 :param target: DBI store (AsciiCat or DB) but usually the target DB to be updated 00788 00789 This operates by comparing all SEQNO in the tables 00790 00791 """ 00792 upas = {} 00793 alldbi = list(set(self.allseqno).union(set(target.allseqno))) 00794 for tn in alldbi: 00795 if len(tselect) > 0 and tn not in tselect: 00796 continue 00797 acsq = set(self.allseqno.get(tn,[])) 00798 tgsq = set(target.allseqno.get(tn,[])) 00799 #log.info("allseqno_updates tn %s acsq %s tgsq %s " % (tn,acsq,tgsq)) 00800 assert len(tgsq.difference(acsq)) == 0 , "ERROR SEQNO in db that are not present in ascii catalog table %s " % tn 00801 upsq = sorted(list(acsq.difference(tgsq))) ## SEQNO in ascii catalog but not target DB 00802 if len(upsq) > 0: 00803 upas[tn] = upsq 00804 else: 00805 pass 00806 for tn,seq in upas.items(): 00807 contiguous = seq[-1] - seq[0] + 1 == len(seq) 00808 if not contiguous: 00809 msg = "non contiguous SEQNO in tn %s %s %s %s " % ( tn, seq[0], seq[-1], len(seq) ) 00810 if tn in ('CalibPmtSpec',): ## WHY NOT NORMALLY TICKLED ? 00811 log.info(msg) 00812 else: 00813 log.fatal(msg) 00814 assert contiguous, msg 00815 00816 #log.debug("allseqno_updates %r " % upas ) 00817 return upas 00818 00819 00820 def diff(self, otherdir): 00821 if type(otherdir) == AsciiCat: 00822 otherdir = otherdir.dir 00823 cmd = "diff -r --brief %s %s | grep -v .svn " % ( self.dir, otherdir ) 00824 log.info("diffing with \"%s\"" % cmd ) 00825 for line in os.popen(cmd).readlines(): 00826 log.info("diff %s" % line.strip()) 00827 00828 00829 class DD(dict): 00830 """ 00831 dict comparison 00832 """ 00833 def __init__(self, a_, b_ , **kwa ): 00834 dict.__init__(self, **kwa ) 00835 a, b = set(a_), set(b_) 00836 c = a.intersection(b) 00837 self['added'] = b - c 00838 self['removed'] = a - c 00839 self['changed'] = set(o for o in c if a_[o] != b_[o]) 00840 self['unchanged'] = set(o for o in c if a_[o] == b_[o]) 00841 ## specialized 00842 if kwa.get('increments'): 00843 self['allincrements'] = dict( (o,b_[o]-a_[o]) for o in c ) 00844 self['increments'] = dict( (o,b_[o]-a_[o]) for o in c if a_[o] != b_[o] ) 00845 00846 def __str__(self): 00847 return pformat(self) 00848 00849 00850 00851 def validate_csv_update( tn , old_ , new_ ): 00852 """ 00853 Standalone comparison of parsed .csv 00854 00855 :param tn: table name 00856 :param old_: path to starting csv 00857 :param new_: path to changed csv 00858 00859 Hmm when operating decoupled the new LOCALSEQNO will 00860 00861 """ 00862 if tn in ('LOCALSEQNO','GLOBALSEQNO',): 00863 old = AsciiCSV(None, old_) 00864 old.read() 00865 log.info( "oldcsv %r %r " % ( old, dict(old) ) ) 00866 new = AsciiCSV(None, new_) 00867 new.read() 00868 log.info( "newcsv %r %r " % ( new, dict(new) ) ) 00869 pass 00870 assert len(new) >= len(old) , "LOCALSEQNO cannot shrink " 00871 00872 00873 00874 def test_diddle(): 00875 cat = AsciiCat("~/dybaux/catalog/tmp_offline_db") 00876 diddle = False 00877 if diddle: 00878 now = datetime.utcnow() 00879 for pk,row in cat['CableMapVld'].items(): 00880 row['INSERTDATE'] = now.strftime("%Y-%m-%d %H:%M:%S") 00881 #print cat.allseqno 00882 print cat.fabseqno 00883 #cat.write( "/tmp/demo" ) 00884 #cat.diff( "/tmp/demo" ) 00885 00886 00887 def test_merge(): 00888 00889 old_ = """TABLENAME char(64),LASTUSEDSEQNO int(11),PRIMARY KEY (TABLENAME) 00890 "*",0 00891 "CableMap",440 00892 "CalibFeeSpec",113 00893 "CalibPmtSpec",29 00894 "FeeCableMap",3 00895 "HardwareID",358 00896 """ 00897 00898 new_ = """TABLENAME char(64),LASTUSEDSEQNO int(11),PRIMARY KEY (TABLENAME) 00899 "*",0 00900 "CableMap",1440 00901 "HardwareID",1358 00902 "Lettuce",100 00903 "Tomato",200 00904 "CalibFeeZiggy",300 00905 """ 00906 00907 old = AsciiCSV( None, None )( old_ ) 00908 print "old", old.lastusedseqno 00909 print "old content " + "*" * 100 00910 print old.content() 00911 00912 new = AsciiCSV( None, None )( new_ ) 00913 print "new", new.lastusedseqno 00914 print "new content " + "*" * 100 00915 print new.content() 00916 00917 mrga = old.merged( new ) 00918 print "mrga ", mrga.lastusedseqno 00919 print "mrga content " + "*" * 100 00920 print mrga.content() 00921 00922 mrgb = new.merged( old ) 00923 print "mrgb ", mrgb.lastusedseqno 00924 print "mrgb content " + "*" * 100 00925 print mrgb.content() 00926 00927 ## merge direction matters ... in the CableMap and HardwareID values 00928 00929 om = DD( old.lastusedseqno, mrga.lastusedseqno , name="om", increments=True ) 00930 print om 00931 00932 00933 00934 def test_fake_write(): 00935 cat = AsciiCat("~/dybaux/catalog/tmp_offline_db") 00936 cat.fake_write('HardwareID', 10 ) 00937 cat.write() 00938 00939 00940 def test_compare_csv(): 00941 old = AsciiCSV( None, "~/dybaux/catalog/tmp_offline_db/CalibPmtSpec/CalibPmtSpec.csv.old")() 00942 new = AsciiCSV( None, "~/dybaux/catalog/tmp_offline_db/CalibPmtSpec/CalibPmtSpec.csv.new")() 00943 assert old == new ## True 00944 ## old.write() standard-orderized 00945 00946 00947 def test_update_relative_to_target(): 00948 from DybPython import DB 00949 target = DB("tmp_offline_db") 00950 cat = AsciiCat("~/dbicopy/tmp_offline_db", skip_pay_check=True, allow_partial=True) 00951 tselect = [tn] 00952 00953 log.info(" cat.seqno=%r" % cat.seqno) 00954 log.info("target.seqno=%r" % target.seqno) 00955 log.info(" cat.allseqno=%r" % cat.allseqno) 00956 #log.info("target.allseqno=%r" % target.allseqno) 00957 00958 updates = cat.updates( target , tselect=tselect , fastforward=False ) 00959 print "asciicat updates relative to target %s " % updates 00960 00961 00962 00963 00964 catd = "~/dbicopy/tmp_offline_db" 00965 tmpd = "/tmp/tdbicopy" 00966 00967 def test_copy_cat(): 00968 """ 00969 #. scrubs any pre-existing tmpd 00970 #. copies asciicat into tmpd 00971 #. checks duplication 00972 """ 00973 import shutil 00974 cat = AsciiCat(catd, skip_pay_check=True, allow_partial=True) 00975 if os.path.isdir(tmpd): 00976 log.info("removing %s " % tmpd ) 00977 shutil.rmtree(tmpd) 00978 pass 00979 tn = 'PhysAd' # conveniently small table 00980 cat[tn]() # reads in payload CSV 00981 cat.write(tmpd) # info file system Ascii Catalog 00982 00983 tmp = AsciiCat(tmpd, skip_pay_check=True, allow_partial=True) 00984 tmp[tn]() # reads in payload CSV 00985 assert cat == tmp 00986 00987 def test_fake_write(): 00988 """ 00989 Requires test_copy_cat run before to create sacrificial AsciiCat to diddle with 00990 00991 #. fake write in memory 00992 #. inplace write in filesystem tmpd 00993 """ 00994 pass 00995 tmp = AsciiCat(tmpd, skip_pay_check=True, allow_partial=True) 00996 tn = 'PhysAd' 00997 tmp[tn]() # reads in payload CSV 00998 tmp.fake_write( tn , npay=1 ) 00999 tmp.write() 01000 for tn,csv in tmp.items(): 01001 log.info("csv %s %s " % ( tn, csv )) 01002 for pk,v in csv.items(): 01003 log.info("pk %s v %s " % ( pk, v )) 01004 01005 def test_fake_updates(): 01006 """ 01007 #. for updates call the target slot (argument 1) `cat` is required to be at/behind base `tmp` slot (argument 0) 01008 """ 01009 cat = AsciiCat(catd, skip_pay_check=True, allow_partial=True) 01010 tmp = AsciiCat(tmpd, skip_pay_check=True, allow_partial=True) 01011 tn = 'PhysAd' 01012 updates = tmp.updates( cat , tselect=[tn] , fastforward=False ) 01013 print updates 01014 01015 def test_dump_cat(): 01016 cat = AsciiCat(basd, skip_pay_check=True, allow_partial=True) 01017 log.info("dumping cat %r " % cat ) 01018 for tn,csv in cat.items(): 01019 log.info("csv %s %s " % ( tn, csv )) 01020 for pk,v in csv.items(): 01021 log.info("pk %s v %s " % ( pk, v )) 01022 01023 01024 def test_partial_rloadcat(): 01025 """ 01026 TODO: 01027 01028 #. make re-testable : by controlling DB status (use fixture control?), 01029 #. currently does nothing after 1st usage as detects nothing to do 01030 """ 01031 tn = 'PhysAd' 01032 _tmpd = tmpd 01033 cmd = "db.py --noconfirm --ALLOW_PARTIAL -t %(tn)s tmp_offline_db rloadcat %(_tmpd)s " % locals() 01034 print cmd 01035 01036 01037 if __name__ == '__main__': 01038 01039 logging.basicConfig(level=logging.INFO) 01040 #test_dump_cat() 01041 test_copy_cat() 01042 test_fake_write() 01043 test_fake_updates() 01044 test_partial_rloadcat() 01045 01046