/search.css" rel="stylesheet" type="text/css"/> /search.js">
00001 import os, logging, stat, pickle, datetime 00002 from pprint import pformat 00003 from multiprocessing import Process 00004 from DybPython import DB, envvar 00005 00006 log = logging.getLogger(__name__) 00007 00008 __all__ = ('Scan','perdict') 00009 00010 loc_mtime = lambda path:datetime.datetime.fromtimestamp(os.stat(path)[stat.ST_MTIME]) 00011 utc_mtime = lambda path:datetime.datetime.utcfromtimestamp(os.stat(path)[stat.ST_MTIME]) 00012 00013 00014 def dbifullscan( vs ): 00015 """ 00016 Perform fullscan for all DBI tables in a single DB, saving the per-table scans 00017 at location given by `vs`. This func is performed in subprocesses for full DBI stack control. 00018 00019 :param dbconf: config section 00020 :param tab: table name or None to traverse all payload tables 00021 :param ctx: context dict passed to fullscan or None to traverse all contexts 00022 """ 00023 from DybDbi import gDbi 00024 gDbi.level = 'WARNING' 00025 00026 dbconf, tab, ctx, opts = vs.coords_() 00027 00028 with envvar(DBCONF=dbconf): 00029 db = DB() 00030 ptabs = [tab] if tab else db.showpaytables 00031 for tn in ptabs: 00032 ta = db.tab(tn) 00033 insertdate = ta.vlast('INSERTDATE') 00034 if not insertdate: 00035 log.error("skipping empty table %s " % tn ) 00036 continue 00037 ctxs = [ctx] if ctx else ta.actual_ctxs() 00038 for _ctx in ctxs: 00039 v = vs.spawn( ctx=_ctx) 00040 path = v.path 00041 00042 stamp = utc_mtime(path) if os.path.exists(path) else None 00043 if stamp == None or insertdate > stamp or vs.rescan: 00044 log.info("cache miss : fullscan %s %s to %s %s " % (tn, _ctx, path, opts)) 00045 fs = ta.fullscan( _ctx , opts ) 00046 fss = Scan(fs) 00047 fss.save( path ) 00048 if vs.txtscan: 00049 fss.txtsave( path+'txt') 00050 else: 00051 log.info("cached hit : fullscan %s %s %s still valid " % (tn, _ctx, path)) 00052 00053 00054 class perdict(dict): 00055 def save(self, path): 00056 with open(path,"wb") as fp: 00057 pickle.dump( dict(self) , fp ) 00058 def load(cls, path): 00059 with open(path,"rb") as fp: 00060 obj = pickle.load(fp) 00061 return cls(obj) 00062 load = classmethod(load) 00063 def txtsave(self, path): 00064 with open(path,"w") as fp: 00065 fp.write( pformat(dict(self)) ) 00066 def txtload(cls, path): 00067 with open(path,"w") as fp: 00068 obj = eval(fp.read()) ## sensitive to form of datetime import 00069 return cls(obj) 00070 txtload = classmethod(txtload) 00071 00072 00073 class Scan(perdict): 00074 def compare( cls , vs ): 00075 00076 path = vs.path 00077 log.info("Scan.compare vs %s creating %s " % ( vs, path )) 00078 00079 cf = vs.compare 00080 00081 assert cf in ('vary','dbconf',) 00082 assert len(vs[cf]) == 2 00083 00084 xv = vs.varycopy(0) 00085 x = xv.make_scan() 00086 00087 yv = vs.varycopy(1) 00088 y = yv.make_scan() 00089 00090 sc = cls.compare_( x , y ) 00091 log.info("Scan.compare saving comparison scan %s to %s " % ( sc['_stat'], path) ) 00092 sc.save( path ) 00093 return sc 00094 00095 compare = classmethod( compare ) 00096 00097 def minmax( cls, d, pfx, val ): 00098 mx, mi = pfx+'max',pfx+'min' 00099 if not d.has_key(mx) or val > d[mx]:d[mx] = val 00100 if not d.has_key(mi) or val < d[mi]:d[mi] = val 00101 minmax = classmethod( minmax ) 00102 00103 00104 def compare_( cls , x , y , diffkeys=['dig','t','n',], i2vfn=lambda _:_['t'] ): 00105 """ 00106 Compare two dict-of-list-of-dict, checking same shape for the 00107 non leaf parts. The leaf dict is compared kv by kv and differences 00108 result in the differing values being incorportated into a tuple of 00109 differers in the newly dict-of-list-of-dict 00110 00111 00112 00113 :param diffkeys: list of keys to regard when detecting dict differences 00114 default ignores collision and ncollision differences 00115 00116 Changes: 00117 00118 #. moved from diffing on SEQNO with `rvs` to digest diffing with `dig` allowing comparison of non-SEQNO aligned 00119 00120 """ 00121 assert type(x) == cls and type(y) == cls 00122 xk = sorted(x.keys()) 00123 yk = sorted(y.keys()) 00124 00125 if xk != yk: 00126 log.info( "xk-yk: %s " % set(xk).difference(set(yk)) ) 00127 log.info( "yk-xk: %s " % set(yk).difference(set(xk)) ) 00128 for ix,xkk in enumerate(xk): 00129 print "x",ix,xkk 00130 for iy,ykk in enumerate(yk): 00131 print "y",iy,ykk 00132 00133 assert xk == yk, (xk,yk) 00134 ndif = 0 00135 cf = {} 00136 00137 mm = dict( dif={} , all={} ) 00138 for k in xk: ## insertdate 00139 lxk = len(x[k]) 00140 lyk = len(y[k]) 00141 00142 if lxk != lyk: 00143 print "x[k]", k 00144 for ix,dx in enumerate(x[k]):print ix,dx 00145 print "y[k]", k 00146 for iy,dy in enumerate(y[k]):print iy,dy 00147 00148 assert lxk == lyk, ( lxk,lyk ) 00149 cf[k] = [] 00150 for i in range(lxk): 00151 dx = x[k][i] 00152 dy = y[k][i] 00153 00154 ivx = i2vfn(dx) 00155 ivy = i2vfn(dy) 00156 assert ivx == ivy , ("differing index to time conversion for x and y ", ivx,ivy ) 00157 00158 cls.minmax( mm['all'] , 'i' , ivx ) 00159 cls.minmax( mm['all'] , 'k' , k ) 00160 00161 dxk = sorted(dx.keys()) 00162 assert dxk == sorted(dy.keys()) 00163 ddif = 0 00164 dcf = {} 00165 for kk in dxk: 00166 if dx[kk] != dy[kk]: 00167 dcf[kk] = (dx[kk],dy[kk]) 00168 if len(diffkeys) == 0 or kk in diffkeys: 00169 ddif += 1 00170 cls.minmax( mm['dif'] , 'i' , ivx ) 00171 cls.minmax( mm['dif'] , 'k' , k ) 00172 log.debug("count difference in %s %s %s " % ( kk, dcf[kk], diffkeys )) 00173 else: 00174 log.debug("ignore difference in %s %s %s " % ( kk, dcf[kk], diffkeys )) 00175 else: 00176 dcf[kk] = dx[kk] 00177 log.debug("equality in %s %s %s " % ( kk, dx[kk], dy[kk] )) 00178 if ddif > 0: 00179 ndif += 1 00180 log.warn( "dcf %s" % dcf ) 00181 00182 cf[k].append( dcf ) 00183 sc = cls(cf) 00184 sc['_stat'] = dict(ndif=ndif, minmax=mm ) 00185 log.warn("Scan.compare_ _stat %s " % sc['_stat'] ) 00186 return sc 00187 compare_ = classmethod( compare_ ) 00188 00189 def get_or_create( cls, _vs ): 00190 """ 00191 Handles both leaf and comparison VlutSpec 00192 action is effected by options `_recompare` and `_rescan` and whether 00193 a prior scan exists 00194 00195 """ 00196 vs = _vs.copy( category="cache" ) 00197 path = vs.path 00198 exists = os.path.exists( path ) 00199 if vs.compare: 00200 if not exists or vs.recompare: 00201 log.info("Scan.get_or_create making comparison scan %s " % path ) 00202 scan = cls.compare( vs ) 00203 else: 00204 log.info("Scan.get_or_create loading comparison scan %s " % path ) 00205 scan = Scan.load( path ) 00206 else: 00207 if not exists or vs.rescan: 00208 log.info("Scan.get_or_create making normal scan %s " % path ) 00209 cls.make_scan( vs ) 00210 scan = Scan.load( path ) 00211 else: 00212 log.info("Scan.get_or_create loading normal scan from %s " % path ) 00213 scan = Scan.load( path ) 00214 return scan 00215 get_or_create = classmethod( get_or_create ) 00216 00217 def make_scan( cls, vs ): 00218 """ 00219 Invokes `dbifullscan` for each `dbconf` in a separate process allowing 00220 the full DBI stack to be booted in each process (working around DBI non-resetability). 00221 The scans are persisted at locations controlled by the `vs` instance. 00222 00223 :param vs: VlutSpec instance specifiying dbconf/tn/ctx/opts 00224 """ 00225 assert vs['category'] == 'cache', (vs, vs['category'] ) 00226 if type(vs['dbconf']) == list: 00227 dbconfs = vs['dbconf'] 00228 else: 00229 dbconfs = [vs['dbconf']] 00230 00231 for dbconf in dbconfs: 00232 v = vs.spawn( dbconf=dbconf ) 00233 p = Process( target=dbifullscan , args=(v,) ) 00234 p.start() 00235 p.join() # blocks to serialize 00236 assert p.exitcode == 0, "dbiscan: subprocess retuned non-zero exitcode %d " % p.exitcode 00237 pass 00238 make_scan = classmethod( make_scan ) 00239 00240 00241 00242 00243 00244 00245 if __name__ == '__main__': 00246 logging.basicConfig(level=logging.INFO) 00247 ordering = "" # "SEQNO desc" "SEQNO asc" 00248 from vlutspec import VlutSpec 00249 vs = VlutSpec( dbconf="tmp_offline_db".split(), tn='CableMap', ctx=None, opts=dict(_rescan=True,ordering=ordering) ) 00250 scan = Scan.get_or_create( vs ) 00251 00252 00253