/search.css" rel="stylesheet" type="text/css"/> /search.js">
00001 #!/usr/bin/env python 00002 """ 00003 """ 00004 import os, logging, shutil, tarfile, copy, re, pickle, time, sys 00005 from cStringIO import StringIO 00006 from common import timing, seconds, scp, remote_dna, sidecar_dna, getLogger 00007 from datetime import datetime 00008 from digestpath import dnapath 00009 log = getLogger(__name__) 00010 00011 00012 class Tar(object): 00013 def __init__(self, path, toplevelname="", mode="gz", remoteprefix="", remotenode="C" , confirm=True, moveaside=True, ALLOWCLOBBER=False): 00014 """ 00015 :param path: to the tarball to be created, extracted or examined 00016 :param toplevelname: relative to the sourcedir or extractdir, 00017 00018 if a `toplevelname` is specified only members within that directory 00019 are tarballed or extracted 00020 00021 Hmm embedding the toplevel name withing the tarball, is not so flexible 00022 when want to test an extracted mysql DB tarball, would be more convenient to just flat 00023 archive the files. 00024 """ 00025 assert len(toplevelname) > 1, "as safety measure a non-blank toplevelname is required" 00026 self.path = path 00027 self.toplevelname = toplevelname 00028 self.mode = mode 00029 if len(remoteprefix)>0: 00030 remotepath = os.path.join(remoteprefix, path[1:]) # have to get rid of path leading slash for the join 00031 else: 00032 remotepath = path 00033 pass 00034 self.remotepath = remotepath 00035 self.remotenode = remotenode 00036 self.confirm = confirm 00037 self.moveaside = moveaside 00038 self.ALLOWCLOBBER = ALLOWCLOBBER 00039 self.names = None 00040 self.members = None 00041 self.prefix = None 00042 self.flattop = None 00043 00044 def __repr__(self): 00045 return self.__class__.__name__ + " %s %s %s " % ( self.path, self.toplevelname, self.mode ) 00046 00047 00048 00049 def list(self, members, verbose=True): 00050 """Print a table of contents to sys.stdout. If `verbose' is False, only 00051 the names of the members are printed. If it is True, an `ls -l'-like 00052 output is produced. 00053 """ 00054 old_stdout = sys.stdout 00055 sys.stdout = mystdout = StringIO() 00056 00057 for tarinfo in members: 00058 if verbose: 00059 print tarfile.filemode(tarinfo.mode), 00060 print "%s/%s" % (tarinfo.uname or tarinfo.uid, 00061 tarinfo.gname or tarinfo.gid), 00062 if tarinfo.ischr() or tarinfo.isblk(): 00063 print "%10s" % ("%d,%d" \ 00064 % (tarinfo.devmajor, tarinfo.devminor)), 00065 else: 00066 print "%10d" % tarinfo.size, 00067 print "%d-%02d-%02d %02d:%02d:%02d" \ 00068 % time.localtime(tarinfo.mtime)[:6], 00069 00070 print tarinfo.name, 00071 00072 if verbose: 00073 if tarinfo.issym(): 00074 print "->", tarinfo.linkname, 00075 if tarinfo.islnk(): 00076 print "link to", tarinfo.linkname, 00077 print 00078 pass 00079 sys.stdout = old_stdout 00080 return mystdout.getvalue() 00081 00082 00083 def members_(self): 00084 """ 00085 Caches the members list from tarballs into a sidecar `.pc` file 00086 to avoid a 70s wait to access the members of a compressed tarball 00087 00088 http://userprimary.net/posts/2007/11/18/ctime-in-unix-means-last-change-time-not-create-time/ 00089 00090 ctime means change time 00091 """ 00092 path = self.path 00093 mtime_ = lambda _:os.path.getmtime(_) 00094 ctime_ = lambda _:os.path.getctime(_) 00095 pc = "%s.pc" % path 00096 members = None 00097 if os.path.exists(pc): 00098 if ctime_(pc) > ctime_(path): 00099 log.debug("load pickled members file %s " % pc ) 00100 members = pickle.load(file(pc,"r")) 00101 else: 00102 log.warn("pickled members exists but is outdated") 00103 pass 00104 pass 00105 if not members: 00106 tf = tarfile.open(path, "r:gz") 00107 members = tf.getmembers() 00108 pickle.dump( members, file(pc,"w")) 00109 log.info("saving pickled members file: %s " % pc) 00110 tf.close() 00111 pass 00112 return members 00113 00114 def names_(self): 00115 members = self.members_() 00116 self.members = members 00117 names = map(lambda ti:ti.name, members) 00118 return names 00119 00120 def examine(self): 00121 assert os.path.exists(self.path), "path %s does not exist " % self.path 00122 log.info("examining %s " % (self.path) ) 00123 names = self.names_() 00124 prefix = os.path.commonprefix(names) 00125 flattop = prefix == "" 00126 00127 log.info("archive contains %s items with commonprefix \"%s\" flattop %s " % ( len(names), prefix, flattop )) 00128 log.debug("\n".join(names)) 00129 00130 self.names = names 00131 self.prefix = prefix 00132 self.flattop = flattop 00133 examine = timing(examine) 00134 00135 00136 def digest(self, verify=False): 00137 """ 00138 Conditions: 00139 00140 #. if sidecar does not exist then compute the digest and persist it. 00141 #. if sidecar exists and `verify=False` then just read the 00142 persisted dna and return it 00143 #. if sidecar exists and `verify=True` read the persited dna, compute it anew and 00144 assert they are the same 00145 00146 DNA Verification is an appropriate check after transferring tarball and 00147 sidecar to another node. 00148 00149 The digest matches than obtained using `md5sum`:: 00150 00151 [blyth@belle7 DybPython]$ md5sum /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7/10000_0.tar.gz 00152 2e0ec7c27ebc383adb6fa102f52ab6c0 /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7/10000_0.tar.gz 00153 00154 Simple dictstring format is used to allow easy remote access:: 00155 00156 [blyth@belle7 DybPython]$ ssh H cat /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7/10000_0.tar.gz.dna 00157 {'dig': 'fa553838e97df686a4df116723ee59f7', 'size': 35360953}[blyth@belle7 DybPython]$ 00158 00159 """ 00160 path = self.path 00161 assert os.path.exists(path), path 00162 sidecar = "%s.dna" % path 00163 00164 # remove stale sidecars 00165 if os.path.exists(sidecar): 00166 if os.path.getctime(path) > os.path.getctime(sidecar): 00167 log.info("sidecar dna exists but is stale %s : removing it " % sidecar ) 00168 os.remove(sidecar) 00169 else: 00170 log.info("sidecar dna exists %s and remains valid " % sidecar ) 00171 pass 00172 pass 00173 00174 if not os.path.exists(sidecar): 00175 dna = dnapath(path) 00176 file(sidecar,"w").write(repr(dna)) 00177 else: 00178 sdna = file(sidecar,"r").read().strip() 00179 assert sdna[0] == '{' and sdna[-1] == '}', sdna 00180 dna = eval(sdna) 00181 log.debug("loaded dna from sidecar %s " % repr(dna)) 00182 if verify: 00183 rdna = dnapath(path) 00184 log.info("rdna recomputed %s " % rdna ) 00185 assert dna == rdna, (dna, rdna) 00186 pass 00187 pass 00188 log.debug("dna digest of the archive is %s " % repr(dna)) 00189 return dna 00190 00191 00192 def archive(self, sourcedir, deleteafter=False, flattop=False): 00193 """ 00194 :param sourcedir: directory containing the `toplevelname` which will be the root of the archive 00195 :param deleteafter: 00196 :param flattop: 00197 00198 Create the archive and examine:: 00199 00200 t = Tar("/var/dbbackup/mysqlhotcopy/belle7.nuu.edu.tw/tmp_offline_db/20130515_1941.tar.gz", toplevelname="tmp_offline_db") 00201 t.archive("/var/dbbackup/mysqlhotcopy/belle7.nuu.edu.tw/tmp_offline_db/20130515_1941") 00202 t.examine() 00203 00204 Examine what is in the archive:: 00205 00206 t = Tar("/var/dbbackup/mysqlhotcopy/belle7.nuu.edu.tw/tmp_offline_db/20130515_1941.tar.gz", toplevelname="tmp_offline_db") 00207 t.examine() 00208 00209 Under toplevelname `tmp_offline_db` within the archive when `flattop=False`:: 00210 00211 tmp_offline_db/ 00212 tmp_offline_db/SupernovaTrigger.MYD 00213 tmp_offline_db/CalibPmtFineGainVld.frm 00214 tmp_offline_db/HardwareID.MYD 00215 ... 00216 00217 Under toplevelname `tmp_offline_db` within the archive when `flattop=True`:: 00218 00219 SupernovaTrigger.MYD 00220 CalibPmtFineGainVld.frm 00221 HardwareID.MYD 00222 ... 00223 00224 To reproduce the layout on another node would then need:: 00225 00226 t = Tar("/var/dbbackup/mysqlhotcopy/belle7.nuu.edu.tw/tmp_offline_db/20130515_1941.tar.gz", toplevelname="tmp_offline_db") 00227 t.extract("/tmp") # creates /tmp/tmp_offline_db 00228 00229 """ 00230 src = os.path.join(sourcedir, self.toplevelname) 00231 assert len(self.toplevelname) > 3 , "sanity check for toplevelname %s fails" % self.toplevelname 00232 log.info("creating %s from %s " % (self.path, src) ) 00233 assert os.path.exists(src) and os.path.isdir(src), "src directory %s does not exist " % src 00234 tgz = tarfile.open(self.path, "w:%s" % self.mode ) 00235 if flattop: 00236 arcname = "" 00237 else: 00238 arcname = self.toplevelname 00239 pass 00240 tgz.add(src, arcname=arcname) 00241 tgz.close() 00242 00243 00244 datedfolder_ptn = re.compile("^\d{8}_\d{4}$") # eg 20130515_1941 00245 if deleteafter: 00246 leaf = sourcedir.split("/")[-1] 00247 if not datedfolder_ptn.match(leaf): 00248 log.warn("NOT deleting sourcedir %s with leaf %s as the leaf is not a dated folder " % ( sourcedir, leaf )) 00249 else: 00250 log.info("deleting sourcedir %s with leaf %s as the leaf is a dated folder " % ( sourcedir, leaf )) 00251 if self.confirm: 00252 confirm = raw_input("enter \"YES\" to confirm deletion of sourcedir %s :" % sourcedir ) 00253 else: 00254 confirm = "YES" 00255 pass 00256 if confirm == "YES": 00257 shutil.rmtree(sourcedir) 00258 else: 00259 log.info("skipping deletion of %s " % sourcedir ) 00260 pass 00261 else: 00262 log.warn("not deleteing after") 00263 archive = timing(archive) 00264 00265 00266 def moveaside(self, target, dryrun=False): 00267 assert os.path.exists(target), target 00268 stamp = datetime.now().strftime("%Y%m%d_%H%M%S") 00269 aside = target + "_" + stamp 00270 msg = "moving aside pre-existing tgt dir %s to %s " % (target, aside) 00271 assert not os.path.exists(aside), (aside, "huh the aside dir exists already ") 00272 if dryrun: 00273 log.info("dryrun : " + msg ) 00274 return 00275 pass 00276 log.info(msg) 00277 os.rename(target, aside) 00278 00279 00280 def _folder_extract(self, containerdir, toplevelname, dryrun=False): 00281 """ 00282 :param containerdir: 00283 :param toplevelname: 00284 :param dryrun: 00285 00286 Folder extraction takes all paths from the archive that are within 00287 a particular toplevelname within the archive and places then 00288 within the `containerdir/` folder. By virtue of the `toplevelname` 00289 paths within the archive the result will be 00290 `containerdir/toplevelname` folder in the filesystem. 00291 00292 This approach has the advantage of a non-exploding tarball, but is 00293 inconvenient for renaming. 00294 00295 The `toplevelname` dir will be created by the extraction. 00296 00297 """ 00298 assert self.flattop is False , "_folder_extract requires non-flattop archive " 00299 assert self.toplevelname == toplevelname ,"_folder_extract requires default toplevelname %s %s " % (self.toplevelname, toplevelname) 00300 tf = tarfile.open(self.path, "r:gz") 00301 wtf = TarFileWrapper(tf) 00302 members = tf.getmembers() 00303 00304 names = map(lambda tinfo:tinfo.name, members) 00305 log.info("_folder_extract names %s " % repr(names)) 00306 00307 #select_ = lambda tinfo:tinfo.name.split('/')[0] == toplevelname # assumption of one level toplevelname incorrect for partitioned archives 00308 select_ = lambda tinfo:tinfo.name.startswith(toplevelname) 00309 select = filter(select_, members) 00310 00311 assert len(members) == len(select), (len(members), len(select), "extraction filtering misses some members, toplevelname %s " % (toplevelname) ) 00312 target = os.path.join(containerdir, toplevelname) 00313 00314 if os.path.exists(target) and self.moveaside: 00315 self.moveaside(target, dryrun=dryrun) 00316 pass 00317 00318 msg = "_folder_extract into containerdir %s for %s members with toplevelname %s " % ( containerdir, len(members), toplevelname ) 00319 if dryrun: 00320 log.info("dryrun: " + msg ) 00321 else: 00322 log.info(msg) 00323 assert not os.path.exists(target), "target dir %s exists already, ABORTING EXTRACTION, use --moveaside option to rename it " % target 00324 wtf.extractall(containerdir, members) 00325 pass 00326 tf.close() 00327 00328 00329 def _check_clobber(self, target, members ): 00330 """ 00331 :param target: directory in which the members are to be extracted 00332 :param members: from the tarfile 00333 :return: list of paths that would be clobberd by the extraction 00334 """ 00335 clobber = [] 00336 fmt = "%-110s : %s " 00337 for member in members: 00338 name = member.name 00339 path = os.path.join(target, name) 00340 if os.path.exists(path): 00341 if name == './': 00342 log.warn(fmt % (name, "SKIP TOPDIR" )) 00343 else: 00344 clobber.append(name) 00345 log.warn(fmt % (name, "**CLOBBER**" )) 00346 else: 00347 log.debug(fmt % (name, "" )) 00348 pass 00349 pass 00350 return clobber 00351 00352 00353 def _flat_extract(self, containerdir, toplevelname, dryrun=False): 00354 """ 00355 :param containerdir: 00356 :param toplevelname: 00357 :param dryrun: 00358 00359 Flat extraction takes all paths from the archive and places 00360 them within the `containerdir/toplevelname` folder 00361 00362 The `toplevelname` dir must be created before the extraction. 00363 00364 """ 00365 assert self.flattop is True , "_flat_extract requires flattop archive " 00366 log.info("_flat_extract opening tarfile %s " % self.path ) 00367 tf = tarfile.open(self.path, "r:gz") 00368 wtf = TarFileWrapper(tf) 00369 members = tf.getmembers() 00370 target = os.path.join(containerdir, toplevelname) 00371 00372 clobber = self._check_clobber( target, members ) 00373 if len(clobber) > 0: 00374 if not self.ALLOWCLOBBER: 00375 log.warn("extraction would clobber %s existing paths, need `--ALLOWCLOBBER` option to do this : %s " % ( len(clobber), "\n".join(clobber) )) 00376 else: 00377 low.warn("proceeding to clobber %s existing paths curtesy of `--ALLOWCLOBBER` option : %s " % ( len(clobber), "\n".join(clobber) )) 00378 else: 00379 log.info("extraction into target %s does not clobber any existing paths " % target ) 00380 00381 00382 msg = "_flat_extract into target %s for %s members with toplevelname %s " % ( target, len(members),toplevelname ) 00383 if dryrun: 00384 log.info("dryrun: " + msg ) 00385 else: 00386 log.info(msg) 00387 if not self.ALLOWCLOBBER: 00388 assert not os.path.exists(target), "target dir %s exists already, ABORTING EXTRACTION use --rename newname " % target 00389 wtf.extractall(target, members) 00390 pass 00391 log.info( os.popen("ls -l %(target)s " % locals()).read() ) 00392 pass 00393 tf.close() 00394 00395 00396 00397 def extract(self, containerdir, toplevelname=None, dryrun=False): 00398 """ 00399 :param containerdir: folder within which the toplevelname dir resides 00400 :param toplevelname: default of None corresponds to original db name 00401 :param dryrun: 00402 00403 The actual extraction method depends on the type of archive detected: 00404 00405 #. `_flat_extract` for a flattop aka exploding archive 00406 #. `_folder_extract` for a folder top archive 00407 00408 Flat extraction has the advantage of easy renaming 00409 """ 00410 00411 if toplevelname is None: 00412 toplevelname = self.toplevelname 00413 pass 00414 log.info("extract containerdir %s toplevelname %s dryrun %s " % (containerdir, toplevelname, dryrun)) 00415 00416 assert os.path.exists(self.path), "path %s does not exist " % self.path 00417 assert os.path.exists(containerdir), "containerdir %s does not exist" % containerdir 00418 assert not self.flattop is None, "ABORT must `examine` before can `extract` " 00419 00420 if self.flattop: 00421 self._flat_extract(containerdir, toplevelname, dryrun=dryrun) 00422 else: 00423 self._folder_extract(containerdir, toplevelname, dryrun=dryrun) 00424 00425 00426 00427 extract = timing(extract) 00428 00429 def transfer(self): 00430 """ 00431 Prior to making expensive transfers of large tarballs the 00432 small local and remote DNA sidecar files are compared to see if the 00433 transfer can be skipped. 00434 00435 The use of `cheat=True` for the remote transfer means that the 00436 contents of the file are trusted to be true. Using `cheat=False` means that 00437 the digest is recomputed to obtain a new DNA. This can be timeconsuming for 00438 large files. 00439 00440 A good approach is to monitor the transfers on the receiving node by 00441 checking that recomputed DNA matches the sidecars. 00442 """ 00443 assert os.path.exists(self.path), "path %s does not exist " % self.path 00444 assert os.path.exists(self.path + '.dna'), "dna path %s does not exist " % self.path + '.dna' 00445 ldna = sidecar_dna(self.path) 00446 assert not ldna is None 00447 rdna = remote_dna(self.remotepath, self.remotenode, cheat=True) # when cheating assume the remote dna is valid 00448 log.debug("remote dna from %s:%s is %s " % ( self.remotenode, self.remotepath, repr(rdna))) 00449 if rdna == ldna: 00450 log.info("SKIP TRANSFER, remote dna of %s:%s matches local %s " % (self.remotenode,self.remotepath,ldna)) 00451 else: 00452 log.info("remote dna %s differs from local %s for %s:%s proceed to transfer" % (rdna, ldna, self.remotenode, self.remotepath)) 00453 scp( self.path, self.remotepath, self.remotenode ) 00454 scp( self.path, self.remotepath, self.remotenode, sidecar_ext='.dna') 00455 pass 00456 transfer = timing(transfer) 00457 00458 00459 class TarFileWrapper(object): 00460 """ 00461 Extractall only appears in 2.7 so back port from there into this wrapper from use from 2.3. 2.4, 2.5, 2.6 00462 """ 00463 def __init__(self, tf): 00464 self.tf = tf 00465 00466 def extractall(self, path=".", members=None): 00467 """Extract all members from the archive to the current working 00468 directory and set owner, modification time and permissions on 00469 directories afterwards. `path' specifies a different directory 00470 to extract to. `members' is optional and must be a subset of the 00471 list returned by getmembers(). 00472 """ 00473 directories = [] 00474 00475 if members is None: 00476 members = self.tf 00477 00478 for tarinfo in members: 00479 if tarinfo.isdir(): 00480 # Extract directories with a safe mode. 00481 directories.append(tarinfo) 00482 tarinfo = copy.copy(tarinfo) 00483 tarinfo.mode = 0700 00484 self.tf.extract(tarinfo, path) 00485 00486 # Reverse sort directories. 00487 directories.sort(lambda a, b: cmp(a.name, b.name)) 00488 directories.reverse() 00489 00490 # Set correct owner, mtime and filemode on directories. 00491 for tarinfo in directories: 00492 dirpath = os.path.join(path, tarinfo.name) 00493 try: 00494 self.tf.chown(tarinfo, dirpath) 00495 self.tf.utime(tarinfo, dirpath) 00496 self.tf.chmod(tarinfo, dirpath) 00497 except ExtractError, e: 00498 if self.tf.errorlevel > 1: 00499 raise 00500 else: 00501 self.tf._dbg(1, "tarfile: %s" % e) 00502 00503 00504 if __name__ == '__main__': 00505 logging.basicConfig(level=logging.INFO) 00506 00507 tgz = "/var/dbbackup/mysqlhotcopy/belle7.nuu.edu.tw/tmp_offline_db/20130520_1353.tar.gz" 00508 t = Tar(tgz, toplevelname="tmp_offline_db") 00509 t.examine() 00510 #t.extract("/tmp/out") 00511 #t.examine() 00512 00513 log.info(seconds) 00514 00515 00516 00517