/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

tar.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 """
00003 """
00004 import os, logging, shutil, tarfile, copy, re, pickle, time, sys
00005 from cStringIO import StringIO
00006 from common import timing, seconds, scp, remote_dna, sidecar_dna, getLogger
00007 from datetime import datetime
00008 from digestpath import dnapath
00009 log = getLogger(__name__)
00010 
00011 
00012 class Tar(object):
00013     def __init__(self, path, toplevelname="", mode="gz", remoteprefix="", remotenode="C" , confirm=True, moveaside=True, ALLOWCLOBBER=False):
00014         """
00015         :param path: to the tarball to be created, extracted or examined
00016         :param toplevelname: relative to the sourcedir or extractdir, 
00017 
00018         if a `toplevelname` is specified only members within that directory 
00019         are tarballed or extracted
00020 
00021         Hmm embedding the toplevel name withing the tarball, is not so flexible 
00022         when want to test an extracted mysql DB tarball, would be more convenient to just flat 
00023         archive the files. 
00024         """
00025         assert len(toplevelname) > 1, "as safety measure a non-blank toplevelname is required" 
00026         self.path = path 
00027         self.toplevelname = toplevelname
00028         self.mode = mode
00029         if len(remoteprefix)>0:
00030             remotepath = os.path.join(remoteprefix, path[1:])   # have to get rid of path leading slash for the join
00031         else:
00032             remotepath = path
00033         pass
00034         self.remotepath = remotepath
00035         self.remotenode = remotenode
00036         self.confirm = confirm 
00037         self.moveaside = moveaside
00038         self.ALLOWCLOBBER = ALLOWCLOBBER
00039         self.names = None
00040         self.members = None
00041         self.prefix = None
00042         self.flattop = None 
00043 
00044     def __repr__(self):
00045         return self.__class__.__name__ + " %s %s %s " % ( self.path, self.toplevelname, self.mode )
00046 
00047 
00048 
00049     def list(self, members, verbose=True):
00050         """Print a table of contents to sys.stdout. If `verbose' is False, only
00051            the names of the members are printed. If it is True, an `ls -l'-like
00052            output is produced.
00053         """
00054         old_stdout = sys.stdout
00055         sys.stdout = mystdout = StringIO()
00056 
00057         for tarinfo in members:
00058             if verbose:
00059                 print tarfile.filemode(tarinfo.mode),
00060                 print "%s/%s" % (tarinfo.uname or tarinfo.uid,
00061                                  tarinfo.gname or tarinfo.gid),
00062                 if tarinfo.ischr() or tarinfo.isblk():
00063                     print "%10s" % ("%d,%d" \
00064                                     % (tarinfo.devmajor, tarinfo.devminor)),
00065                 else:
00066                     print "%10d" % tarinfo.size,
00067                 print "%d-%02d-%02d %02d:%02d:%02d" \
00068                       % time.localtime(tarinfo.mtime)[:6],
00069 
00070             print tarinfo.name,
00071 
00072             if verbose:
00073                 if tarinfo.issym():
00074                     print "->", tarinfo.linkname,
00075                 if tarinfo.islnk():
00076                     print "link to", tarinfo.linkname,
00077             print
00078             pass 
00079         sys.stdout = old_stdout
00080         return mystdout.getvalue()
00081 
00082 
00083     def members_(self):
00084         """
00085         Caches the members list from tarballs into a sidecar `.pc` file
00086         to avoid a 70s wait to access the members of a compressed tarball
00087 
00088         http://userprimary.net/posts/2007/11/18/ctime-in-unix-means-last-change-time-not-create-time/
00089 
00090              ctime means change time
00091         """
00092         path = self.path
00093         mtime_ = lambda _:os.path.getmtime(_)
00094         ctime_ = lambda _:os.path.getctime(_)
00095         pc = "%s.pc" % path
00096         members = None
00097         if os.path.exists(pc):
00098             if ctime_(pc) > ctime_(path):
00099                 log.debug("load pickled members file %s " % pc )
00100                 members = pickle.load(file(pc,"r")) 
00101             else:
00102                 log.warn("pickled members exists but is outdated")
00103             pass
00104         pass 
00105         if not members:
00106             tf = tarfile.open(path, "r:gz")
00107             members = tf.getmembers() 
00108             pickle.dump( members, file(pc,"w")) 
00109             log.info("saving pickled members file: %s " % pc)
00110             tf.close() 
00111         pass
00112         return members
00113 
00114     def names_(self):
00115         members = self.members_()
00116         self.members = members
00117         names = map(lambda ti:ti.name, members)
00118         return names
00119 
00120     def examine(self):
00121         assert os.path.exists(self.path), "path %s does not exist " % self.path 
00122         log.info("examining %s " % (self.path) )
00123         names = self.names_()
00124         prefix = os.path.commonprefix(names)
00125         flattop = prefix == ""
00126 
00127         log.info("archive contains %s items with commonprefix \"%s\" flattop %s " % ( len(names), prefix, flattop  ))
00128         log.debug("\n".join(names))
00129 
00130         self.names = names
00131         self.prefix = prefix
00132         self.flattop = flattop 
00133     examine = timing(examine)
00134 
00135 
00136     def digest(self, verify=False):
00137         """
00138         Conditions: 
00139 
00140         #. if sidecar does not exist then compute the digest and persist it.
00141         #. if sidecar exists and `verify=False` then just read the 
00142            persisted dna and return it 
00143         #. if sidecar exists and `verify=True` read the persited dna, compute it anew and 
00144            assert they are the same
00145 
00146         DNA Verification is an appropriate check after transferring tarball and 
00147         sidecar to another node.
00148 
00149         The digest matches than obtained using `md5sum`::
00150 
00151             [blyth@belle7 DybPython]$ md5sum /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7/10000_0.tar.gz
00152             2e0ec7c27ebc383adb6fa102f52ab6c0  /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7/10000_0.tar.gz
00153 
00154         Simple dictstring format is used to allow easy remote access::
00155 
00156             [blyth@belle7 DybPython]$ ssh H cat /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7/10000_0.tar.gz.dna 
00157             {'dig': 'fa553838e97df686a4df116723ee59f7', 'size': 35360953}[blyth@belle7 DybPython]$ 
00158 
00159         """
00160         path = self.path
00161         assert os.path.exists(path), path
00162         sidecar = "%s.dna" % path
00163 
00164         # remove stale sidecars
00165         if os.path.exists(sidecar):
00166             if os.path.getctime(path) > os.path.getctime(sidecar): 
00167                 log.info("sidecar dna exists but is stale %s : removing it " % sidecar )
00168                 os.remove(sidecar)
00169             else:
00170                 log.info("sidecar dna exists %s and remains valid " % sidecar )
00171             pass
00172         pass
00173 
00174         if not os.path.exists(sidecar):
00175             dna = dnapath(path)
00176             file(sidecar,"w").write(repr(dna))
00177         else:
00178             sdna = file(sidecar,"r").read().strip()
00179             assert sdna[0] == '{' and sdna[-1] == '}', sdna
00180             dna = eval(sdna)
00181             log.debug("loaded dna from sidecar %s " % repr(dna))
00182             if verify:
00183                 rdna = dnapath(path)
00184                 log.info("rdna recomputed %s " % rdna )
00185                 assert dna == rdna, (dna, rdna) 
00186             pass
00187         pass
00188         log.debug("dna digest of the archive is %s " % repr(dna))
00189         return dna    
00190 
00191 
00192     def archive(self, sourcedir, deleteafter=False, flattop=False):
00193         """
00194         :param sourcedir: directory containing the `toplevelname` which will be the root of the archive 
00195         :param deleteafter:
00196         :param flattop:
00197 
00198         Create the archive and examine::
00199 
00200            t = Tar("/var/dbbackup/mysqlhotcopy/belle7.nuu.edu.tw/tmp_offline_db/20130515_1941.tar.gz", toplevelname="tmp_offline_db")
00201            t.archive("/var/dbbackup/mysqlhotcopy/belle7.nuu.edu.tw/tmp_offline_db/20130515_1941")
00202            t.examine()
00203 
00204         Examine what is in the archive:: 
00205     
00206            t = Tar("/var/dbbackup/mysqlhotcopy/belle7.nuu.edu.tw/tmp_offline_db/20130515_1941.tar.gz", toplevelname="tmp_offline_db")
00207            t.examine()
00208 
00209         Under toplevelname `tmp_offline_db` within the archive when `flattop=False`::
00210 
00211             tmp_offline_db/
00212             tmp_offline_db/SupernovaTrigger.MYD
00213             tmp_offline_db/CalibPmtFineGainVld.frm
00214             tmp_offline_db/HardwareID.MYD
00215             ...
00216 
00217         Under toplevelname `tmp_offline_db` within the archive when `flattop=True`::
00218 
00219             SupernovaTrigger.MYD
00220             CalibPmtFineGainVld.frm
00221             HardwareID.MYD
00222             ...
00223  
00224         To reproduce the layout on another node would then need::
00225 
00226            t = Tar("/var/dbbackup/mysqlhotcopy/belle7.nuu.edu.tw/tmp_offline_db/20130515_1941.tar.gz", toplevelname="tmp_offline_db")
00227            t.extract("/tmp")  # creates /tmp/tmp_offline_db 
00228 
00229         """
00230         src = os.path.join(sourcedir, self.toplevelname) 
00231         assert len(self.toplevelname) > 3 , "sanity check for toplevelname %s fails" % self.toplevelname
00232         log.info("creating %s from %s " %  (self.path, src) )
00233         assert os.path.exists(src) and os.path.isdir(src), "src directory %s does not exist " % src
00234         tgz = tarfile.open(self.path, "w:%s" % self.mode )
00235         if flattop:
00236             arcname = ""
00237         else:
00238             arcname = self.toplevelname 
00239         pass
00240         tgz.add(src, arcname=arcname) 
00241         tgz.close() 
00242 
00243 
00244         datedfolder_ptn = re.compile("^\d{8}_\d{4}$") # eg 20130515_1941
00245         if deleteafter:
00246             leaf = sourcedir.split("/")[-1]
00247             if not datedfolder_ptn.match(leaf):
00248                 log.warn("NOT deleting sourcedir %s with leaf %s as the leaf is not a dated folder " % ( sourcedir, leaf ))
00249             else:
00250                 log.info("deleting sourcedir %s with leaf %s as the leaf is a dated folder " % ( sourcedir, leaf ))
00251                 if self.confirm:
00252                     confirm = raw_input("enter \"YES\" to confirm deletion of sourcedir %s :" % sourcedir )
00253                 else:
00254                     confirm = "YES"
00255                 pass 
00256                 if confirm == "YES":
00257                     shutil.rmtree(sourcedir)
00258                 else:
00259                     log.info("skipping deletion of %s " % sourcedir ) 
00260                 pass
00261         else:
00262             log.warn("not deleteing after")
00263     archive = timing(archive)
00264 
00265 
00266     def moveaside(self, target, dryrun=False):
00267         assert os.path.exists(target), target
00268         stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
00269         aside = target + "_" + stamp
00270         msg = "moving aside pre-existing tgt dir %s to %s " % (target, aside) 
00271         assert not os.path.exists(aside), (aside, "huh the aside dir exists already ")
00272         if dryrun:
00273             log.info("dryrun : " + msg )
00274             return
00275         pass
00276         log.info(msg) 
00277         os.rename(target, aside)
00278 
00279 
00280     def _folder_extract(self, containerdir, toplevelname, dryrun=False):
00281         """  
00282         :param containerdir:
00283         :param toplevelname:
00284         :param dryrun:
00285 
00286         Folder extraction takes all paths from the archive that are within 
00287         a particular toplevelname within the archive and places then 
00288         within the `containerdir/` folder. By virtue of the `toplevelname` 
00289         paths within the archive the result will be 
00290         `containerdir/toplevelname` folder in the filesystem.
00291 
00292         This approach has the advantage of a non-exploding tarball, but is 
00293         inconvenient for renaming. 
00294 
00295         The `toplevelname` dir will be created by the extraction.
00296 
00297         """
00298         assert self.flattop is False , "_folder_extract requires non-flattop archive "
00299         assert self.toplevelname == toplevelname ,"_folder_extract requires default toplevelname %s %s " % (self.toplevelname, toplevelname)
00300         tf = tarfile.open(self.path, "r:gz")
00301         wtf = TarFileWrapper(tf)
00302         members = tf.getmembers()
00303 
00304         names = map(lambda tinfo:tinfo.name, members)
00305         log.info("_folder_extract names %s " % repr(names))
00306 
00307         #select_ = lambda tinfo:tinfo.name.split('/')[0] == toplevelname   # assumption of one level toplevelname incorrect for partitioned archives
00308         select_ = lambda tinfo:tinfo.name.startswith(toplevelname)
00309         select = filter(select_, members)
00310 
00311         assert len(members) == len(select), (len(members), len(select), "extraction filtering misses some members, toplevelname %s " % (toplevelname) ) 
00312         target = os.path.join(containerdir, toplevelname)
00313 
00314         if os.path.exists(target) and self.moveaside:
00315             self.moveaside(target, dryrun=dryrun)
00316         pass
00317 
00318         msg = "_folder_extract into containerdir %s for %s members with toplevelname %s  " % ( containerdir, len(members), toplevelname )
00319         if dryrun:
00320             log.info("dryrun: " + msg )
00321         else:
00322             log.info(msg)
00323             assert not os.path.exists(target), "target dir %s exists already, ABORTING EXTRACTION, use --moveaside option to rename it " % target
00324             wtf.extractall(containerdir, members) 
00325         pass
00326         tf.close() 
00327 
00328 
00329     def _check_clobber(self, target, members ):
00330         """
00331         :param target: directory in which the members are to be extracted
00332         :param members: from the tarfile
00333         :return: list of paths that would be clobberd by the extraction
00334         """
00335         clobber = []
00336         fmt = "%-110s :  %s " 
00337         for member in members:
00338             name = member.name
00339             path = os.path.join(target, name)
00340             if os.path.exists(path):
00341                 if name == './':
00342                     log.warn(fmt % (name, "SKIP TOPDIR" ))
00343                 else:
00344                     clobber.append(name)
00345                     log.warn(fmt % (name, "**CLOBBER**" ))
00346             else:
00347                 log.debug(fmt % (name, "" ))
00348             pass
00349         pass
00350         return clobber
00351 
00352 
00353     def _flat_extract(self, containerdir, toplevelname, dryrun=False):
00354         """
00355         :param containerdir:
00356         :param toplevelname:
00357         :param dryrun:
00358 
00359         Flat extraction takes all paths from the archive and places
00360         them within the `containerdir/toplevelname` folder 
00361 
00362         The `toplevelname` dir must be created before the extraction.
00363 
00364         """
00365         assert self.flattop is True , "_flat_extract requires flattop archive "
00366         log.info("_flat_extract opening tarfile %s " % self.path )
00367         tf = tarfile.open(self.path, "r:gz")
00368         wtf = TarFileWrapper(tf)
00369         members = tf.getmembers()
00370         target = os.path.join(containerdir, toplevelname)
00371 
00372         clobber = self._check_clobber( target, members )
00373         if len(clobber) > 0:
00374             if not self.ALLOWCLOBBER:
00375                 log.warn("extraction would clobber %s existing paths, need `--ALLOWCLOBBER` option to do this : %s " % ( len(clobber), "\n".join(clobber) ))   
00376             else:
00377                 low.warn("proceeding to clobber %s existing paths curtesy of `--ALLOWCLOBBER` option : %s " %  ( len(clobber), "\n".join(clobber) )) 
00378         else:
00379             log.info("extraction into target %s does not clobber any existing paths " % target )   
00380 
00381 
00382         msg = "_flat_extract into target %s for %s members with toplevelname %s " % ( target, len(members),toplevelname )
00383         if dryrun:
00384             log.info("dryrun: " + msg )
00385         else:
00386             log.info(msg)
00387             if not self.ALLOWCLOBBER:
00388                 assert not os.path.exists(target), "target dir %s exists already, ABORTING EXTRACTION use --rename newname " % target 
00389             wtf.extractall(target, members) 
00390             pass 
00391             log.info( os.popen("ls -l %(target)s " % locals()).read() )
00392         pass
00393         tf.close() 
00394 
00395 
00396 
00397     def extract(self, containerdir, toplevelname=None, dryrun=False):
00398         """
00399         :param containerdir: folder within which the toplevelname dir resides
00400         :param toplevelname: default of None corresponds to original db name
00401         :param dryrun:
00402 
00403         The actual extraction method depends on the type of archive detected:
00404 
00405         #. `_flat_extract` for a flattop aka exploding archive 
00406         #. `_folder_extract` for a folder top archive  
00407 
00408         Flat extraction has the advantage of easy renaming  
00409         """
00410         
00411         if toplevelname is None:
00412             toplevelname = self.toplevelname
00413         pass
00414         log.info("extract containerdir %s toplevelname %s dryrun %s " % (containerdir, toplevelname, dryrun))
00415 
00416         assert os.path.exists(self.path), "path %s does not exist " % self.path 
00417         assert os.path.exists(containerdir), "containerdir %s does not exist" % containerdir
00418         assert not self.flattop is None, "ABORT must `examine` before can `extract` "
00419 
00420         if self.flattop:
00421              self._flat_extract(containerdir, toplevelname, dryrun=dryrun)
00422         else: 
00423              self._folder_extract(containerdir, toplevelname, dryrun=dryrun)
00424 
00425 
00426 
00427     extract = timing(extract)
00428 
00429     def transfer(self):
00430         """
00431         Prior to making expensive transfers of large tarballs the 
00432         small local and remote DNA sidecar files are compared to see if the 
00433         transfer can be skipped.
00434 
00435         The use of `cheat=True` for the remote transfer means that the 
00436         contents of the file are trusted to be true. Using `cheat=False` means that 
00437         the digest is recomputed to obtain a new DNA. This can be timeconsuming for 
00438         large files.
00439 
00440         A good approach is to monitor the transfers on the receiving node by 
00441         checking that recomputed DNA matches the sidecars.
00442         """
00443         assert os.path.exists(self.path), "path %s does not exist " % self.path 
00444         assert os.path.exists(self.path + '.dna'), "dna path %s does not exist " % self.path + '.dna'
00445         ldna = sidecar_dna(self.path)
00446         assert not ldna is None
00447         rdna = remote_dna(self.remotepath, self.remotenode, cheat=True)  # when cheating assume the remote dna is valid
00448         log.debug("remote dna from %s:%s is %s " % ( self.remotenode, self.remotepath, repr(rdna)))
00449         if rdna == ldna:
00450             log.info("SKIP TRANSFER, remote dna of %s:%s matches local %s " % (self.remotenode,self.remotepath,ldna))
00451         else:
00452             log.info("remote dna %s differs from local %s for %s:%s proceed to  transfer" % (rdna, ldna, self.remotenode, self.remotepath))
00453             scp( self.path, self.remotepath, self.remotenode )
00454             scp( self.path, self.remotepath, self.remotenode, sidecar_ext='.dna')
00455         pass
00456     transfer = timing(transfer)
00457         
00458  
00459 class TarFileWrapper(object):
00460     """
00461     Extractall only appears in 2.7 so back port from there into this wrapper from use from 2.3. 2.4, 2.5, 2.6  
00462     """
00463     def __init__(self, tf):
00464         self.tf = tf
00465 
00466     def extractall(self, path=".", members=None):
00467         """Extract all members from the archive to the current working
00468            directory and set owner, modification time and permissions on
00469            directories afterwards. `path' specifies a different directory
00470            to extract to. `members' is optional and must be a subset of the
00471            list returned by getmembers().
00472         """
00473         directories = []
00474 
00475         if members is None:
00476             members = self.tf
00477 
00478         for tarinfo in members:
00479             if tarinfo.isdir():
00480                 # Extract directories with a safe mode.
00481                 directories.append(tarinfo)
00482                 tarinfo = copy.copy(tarinfo)
00483                 tarinfo.mode = 0700
00484             self.tf.extract(tarinfo, path)
00485 
00486         # Reverse sort directories.
00487         directories.sort(lambda a, b: cmp(a.name, b.name))
00488         directories.reverse()
00489 
00490         # Set correct owner, mtime and filemode on directories.
00491         for tarinfo in directories:
00492             dirpath = os.path.join(path, tarinfo.name)
00493             try:
00494                 self.tf.chown(tarinfo, dirpath)
00495                 self.tf.utime(tarinfo, dirpath)
00496                 self.tf.chmod(tarinfo, dirpath)
00497             except ExtractError, e:
00498                 if self.tf.errorlevel > 1:
00499                     raise
00500                 else:
00501                     self.tf._dbg(1, "tarfile: %s" % e)
00502 
00503 
00504 if __name__ == '__main__':
00505     logging.basicConfig(level=logging.INFO)
00506 
00507     tgz = "/var/dbbackup/mysqlhotcopy/belle7.nuu.edu.tw/tmp_offline_db/20130520_1353.tar.gz"
00508     t = Tar(tgz, toplevelname="tmp_offline_db")
00509     t.examine()
00510     #t.extract("/tmp/out")
00511     #t.examine()
00512 
00513     log.info(seconds)
00514 
00515 
00516 
00517 
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 09:55:40 for DybPython by doxygen 1.7.4