/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

Public Member Functions | Public Attributes | Static Public Attributes | Properties | Private Member Functions | Private Attributes
DybPython::dbsrv::DB Class Reference

List of all members.

Public Member Functions

def __init__
def dispatch
def lscmd___
def database_drop_create
def docs
def execute_
def fetchall
def __call__
def lsdatabases___
def lstables___
def summary___
def show_create
def timestamped_dir
def dumplocal___
def fields
def range
def minmax
def ptables
def tabminmax_csv
def determine_basedir
def assert_valid_dump
def partition_dumpcheck
def partition_dumplocal___
def extract
def archivepath
def archive
def partition_loadlocal___
def loadlocal_dir
def loadlocal___

Public Attributes

 sect
 opts
 home
 backupdir
 is_local
 dbc
 cnf
 conn
 llconn
 partitionmgr
 count
 lastseconds
 result of select is returned to python and thence formatted directly into csv, works remotely

Static Public Attributes

tuple docs = classmethod(docs)

Properties

 size = property(_get_size, doc="Size estimate of the DB in MB ")
 databases = property(_get_databases, doc="List of database names obtained from information_schema.tables")
 tables = property(_get_tables_infoschema, doc="List of table names obtained from information_schema.tables")
 datadir = property(_get_datadir, doc="Query DB server to find the datadir, eg /var/lib/mysql/ OR /data/mysql/ ")
 utables = property(_get_utables, doc="List of tables to use in operations, when --tables option is used this can be a subset of all tables.")

Private Member Functions

def _query_size
def _get_size
def _get_databases
def _get_tables_infoschema
def _get_tables
def _get_datadir
def _rst_table
def _get_utables
def _dumplocal_mkdir
def _dumplocal_schema
def _dumplocal_table
def _write_csvdirect
def _wc_csv

Private Attributes

 _size

Detailed Description

Definition at line 569 of file dbsrv.py.


Constructor & Destructor Documentation

def DybPython::dbsrv::DB::__init__ (   self,
  sect,
  opts = None,
  home = None 
)
:param sect: name of section in config file 
:param opts: options
:param home: DB instance


Safety constraints on config to minimize accidents from config confusion.

Initially required non-loopback section names and database names to be the same

Loosening this to allow remote commands, by designating a "home" instance
and requiring all other instances to match that one in everything but the 
database name

Definition at line 570 of file dbsrv.py.

00571                                                   :
00572         """
00573         :param sect: name of section in config file 
00574         :param opts: options
00575         :param home: DB instance
00576 
00577 
00578         Safety constraints on config to minimize accidents from config confusion.
00579 
00580         Initially required non-loopback section names and database names to be the same
00581 
00582         Loosening this to allow remote commands, by designating a "home" instance
00583         and requiring all other instances to match that one in everything but the 
00584         database name
00585 
00586         """
00587         pass 
00588         self.sect = sect
00589         self.opts = opts
00590         self.home = home
00591         self.backupdir = os.path.join(self.opts.backupfold, platform.node(), sect )   
00592 
00593         cnf = MyCnf("~/.my.cnf")
00594         dbc = cnf.mysqldb_pars(sect, home=home)   # NB sect is taken as the DB name if there is no such section in the config
00595 
00596         log.debug("sect %s home %s dbc %s " % (sect, home, repr(dbc)))
00597  
00598         if sect == "loopback":
00599             assert dbc['host'] in ('localhost','127.0.0.1')   # curious 127.0.0.1 not working on cms01 need localhost
00600         pass
00601         if home is None: 
00602             assert dbc['db'] in ("information_schema", "mysql")  # older mysql (eg on cms01 does not have information_schema DB ? OR its protected)
00603             assert opts.home == sect, "only the home instance is allowed an undefined home parameter " 
00604         else:
00605             log.debug("     dbc %s " % repr(dbc))
00606             log.debug("home.dbc %s " % repr(home.dbc))
00607             #qwns = "host user passwd"
00608             qwns = "host"
00609             for qwn in qwns.split():
00610                 assert dbc.get(qwn,None) == home.dbc.get(qwn,None), "non \"home\" instances are constrained to match the \"home\" in everything but the db "
00611 
00612         self.is_local = dbc['host'] == '127.0.0.1'
00613         self.dbc = dbc
00614         self.cnf = cnf
00615         log.debug("connecting to %s " % dict(dbc, passwd="***"))
00616         try:  
00617             conn = MySQLdb.connect( **dbc )   # huh, version variation in accepted params
00618         except MySQLdb.Error, e: 
00619             raise Exception("Error %d: %s " % ( e.args[0], e.args[1] ) )
00620 
00621         llconn = _mysql.connect( **dbc )       
00622 
00623         self.conn = conn
00624         self.llconn = llconn
00625         self._size = None
00626         pass
00627 
00628         partitionmgr = PartitionMgr(opts, self)
00629         self.partitionmgr = partitionmgr
00630 


Member Function Documentation

def DybPython::dbsrv::DB::dispatch (   self,
  args,
  kwa 
)

Definition at line 631 of file dbsrv.py.

00632                                     :
00633         if self.opts.partition:
00634             pfx = "partition_"
00635         else:
00636             pfx = ""
00637 
00638         cmd = pfx + args[0] + "___"    
00639         if hasattr(self, cmd ):
00640             getattr( self , cmd)( *args[1:], **kwa )   
00641         else:
00642             raise Exception("cmd %s not implemented " % cmd)

def DybPython::dbsrv::DB::lscmd___ (   self,
  args,
  kwa 
)

Definition at line 643 of file dbsrv.py.

00644                                      :
00645         for m in filter(lambda _:_[-3:] == '___', dir(self)):
00646             print m[:-3]  

def DybPython::dbsrv::DB::database_drop_create (   self,
  dbname 
)
:param dbname: name of the database to be dropped and recreated 

Definition at line 647 of file dbsrv.py.

00648                                           :
00649         """
00650         :param dbname: name of the database to be dropped and recreated 
00651         """
00652         assert self.sect == "loopback" and self.sect != dbname , ( self.sect, dbname  )
00653         if self.opts.noconfirm:
00654             log.info("proceed with DB_DROP_CREATE of %(dbname)s without confirmation" % locals() )
00655         else:
00656             ans = raw_input("DROP and reCREATE database %(dbname)s loosing all tables contained ? Enter \"YES\" to proceed : " % locals()) 
00657             if ans != "YES":
00658                 log.warn("skipping DROP CREATE ")
00659                 return
00660        
00661         self("drop database if exists %(dbname)s " % locals() ) 
00662         self("create database %(dbname)s " % locals() ) 
00663 

collect the docstrings on command methods 
identified by naming convention of ending with ___ 

Definition at line 664 of file dbsrv.py.

00665                    :
00666         """
00667         collect the docstrings on command methods 
00668         identified by naming convention of ending with ___ 
00669         """
00670         mdoc = lambda m:getattr(m,'__doc__',None)
00671         mdocs  = [ dict(meth=k[:-3],doc=mdoc(v)) for k,v in [(k,v) for k,v in inspect.getmembers(cls) if k[-3:]=='___' and k[0] != '_' and mdoc(v)]]
        return "\n".join([ """ %(meth)s : %(doc)s """ % d for d in mdocs ])   
def DybPython::dbsrv::DB::execute_ (   self,
  cmd 
)

Definition at line 675 of file dbsrv.py.

00676                            :
00677         cursor = self.conn.cursor(MySQLdb.cursors.DictCursor)
00678         cursor.execute( cmd )
00679         return cursor

def DybPython::dbsrv::DB::fetchall (   self,
  cmd 
)

Definition at line 680 of file dbsrv.py.

00681                             :
00682         cursor = self.execute_(cmd)
00683         rows = cursor.fetchall()
00684         self.count = cursor.rowcount
00685         cursor.close()
00686         return rows

def DybPython::dbsrv::DB::_query_size (   self) [private]

Definition at line 687 of file dbsrv.py.

00688                          :
00689         sql = "select round(sum((data_length+index_length-data_free)/1024/1024),2) as TOT_MB from information_schema.tables where table_schema = '%(db)s' " % self.dbc
        return float(self(sql)[0]['TOT_MB'])
def DybPython::dbsrv::DB::_get_size (   self) [private]

Definition at line 690 of file dbsrv.py.

00691                        :
00692         if self._size is None:
00693              self._size = self._query_size()
        return self._size 
def DybPython::dbsrv::DB::_get_databases (   self) [private]
This query gives fewer results than `show databases`, which demands skips to avoid errors in getting sizes 
#skip = "hello hello2 other test_noname tmp_cascade_2 tmp_dbitest tmp_tmp_offline_db_2".split()  

Definition at line 696 of file dbsrv.py.

00697                             :
00698         """
00699         This query gives fewer results than `show databases`, which demands skips to avoid errors in getting sizes 
00700         #skip = "hello hello2 other test_noname tmp_cascade_2 tmp_dbitest tmp_tmp_offline_db_2".split()  
00701         """
00702         sql = "select distinct(table_schema) from information_schema.tables"
        return map(lambda _:_['table_schema'],self(sql))
def DybPython::dbsrv::DB::_get_tables_infoschema (   self) [private]

Definition at line 706 of file dbsrv.py.

00707                                     :
00708         """
00709         """
00710         sql = "select distinct(table_name) from information_schema.tables where table_schema='%(db)s'" % self.dbc
        return map(lambda _:_['table_name'],self(sql))
def DybPython::dbsrv::DB::_get_tables (   self) [private]
Older mysql does not have information_schema

Definition at line 714 of file dbsrv.py.

00715                          :
00716         """
00717         Older mysql does not have information_schema
00718         """
00719         sql = "show tables" 
00720         tables = []
00721         for d in self(sql):
00722             k,v = d.items()[0]
00723             tables.append(v)
        return tables
def DybPython::dbsrv::DB::_get_datadir (   self) [private]

Definition at line 727 of file dbsrv.py.

00728                           :
        return self("select @@datadir as datadir")[0]['datadir']
def DybPython::dbsrv::DB::__call__ (   self,
  cmd 
)

Definition at line 731 of file dbsrv.py.

00732                            :
00733         log.debug(cmd)
00734         t0 = time.time()
00735         ret = self.fetchall(cmd)
00736         t1 = time.time()
00737         self.lastseconds = t1 - t0 
00738         return ret
00739 

def DybPython::dbsrv::DB::lsdatabases___ (   self,
  args,
  kwa 
)
list databases

Definition at line 740 of file dbsrv.py.

00741                                            :
00742         """
00743         list databases
00744         """ 
00745         print "\n".join(self.databases)

def DybPython::dbsrv::DB::lstables___ (   self,
  args,
  kwa 
)
list tables

Definition at line 746 of file dbsrv.py.

00747                                         :
00748         """
00749         list tables
00750         """
00751         print "\n".join(self.tables)

def DybPython::dbsrv::DB::summary___ (   self,
  args,
  kwa 
)
Present summary of tables in rst table format:

==============================  ==========  ==============================  ==============================
TABLE_NAME                      TABLE_ROWS  CREATE_TIME                     CHECK_TIME                    
==============================  ==========  ==============================  ==============================
DqChannel                       62126016    2013-05-30 18:52:51             2013-05-30 18:52:51           
DqChannelStatus                 62126016    2013-05-30 18:17:42             2013-05-30 18:17:42           
DqChannelStatusVld              323573      2013-05-30 18:52:44             None                          
DqChannelVld                    323573      2013-05-30 19:34:55             None                          
LOCALSEQNO                      3           2013-05-30 19:35:02             None                          
==============================  ==========  ==============================  ==============================
     

Definition at line 752 of file dbsrv.py.

00753                                       :
00754         """
00755         Present summary of tables in rst table format:
00756 
00757         ==============================  ==========  ==============================  ==============================
00758         TABLE_NAME                      TABLE_ROWS  CREATE_TIME                     CHECK_TIME                    
00759         ==============================  ==========  ==============================  ==============================
00760         DqChannel                       62126016    2013-05-30 18:52:51             2013-05-30 18:52:51           
00761         DqChannelStatus                 62126016    2013-05-30 18:17:42             2013-05-30 18:17:42           
00762         DqChannelStatusVld              323573      2013-05-30 18:52:44             None                          
00763         DqChannelVld                    323573      2013-05-30 19:34:55             None                          
00764         LOCALSEQNO                      3           2013-05-30 19:35:02             None                          
00765         ==============================  ==========  ==============================  ==============================
00766              
00767         """ 
00768         result = self("select TABLE_NAME, TABLE_ROWS, CREATE_TIME, UPDATE_TIME, CHECK_TIME  from information_schema.tables where table_schema = '%(db)s' " % self.dbc)
00769         kws = (('TABLE_NAME', 30), ('TABLE_ROWS', 10), ('CREATE_TIME', 30 ), ('CHECK_TIME',30),)
00770         print "\n".join(["",self.sect,self._rst_table( result, kws )])
00771    
    
def DybPython::dbsrv::DB::_rst_table (   self,
  result,
  kws,
  char = "=" 
) [private]
:param result: interable providing result dicts 
:param kws: sequence of 2-tuples providing result dict keys and presentation widths 
:param char: 
:return: multi line string rst table presentation 

Definition at line 772 of file dbsrv.py.

00773                                                :
00774         """
00775         :param result: interable providing result dicts 
00776         :param kws: sequence of 2-tuples providing result dict keys and presentation widths 
00777         :param char: 
00778         :return: multi line string rst table presentation 
00779         """ 
00780         fcol_ = lambda kw:"%s(%s)-%s%s" % ("%",kw[0],kw[1],"s")
00781         fmt = "  ".join(map(fcol_,kws) ) 
00782 
00783         # spell these out for ancient python 
00784         #mkr = fmt % dict((kw[0],char * kw[1]) for kw in kws)
00785         #lbl = fmt % dict((kw[0],kw[0]) for kw in kws)
00786         dmkr = {}
00787         dlbl = {}
00788         for kw in kws:
00789             dmkr[kw[0]] = char * kw[1] 
00790             dlbl[kw[0]] = kw[0] 
00791         mkr = fmt % dmkr
00792         lbl = fmt % dlbl
00793         bdy = [fmt % d for d in result]
00794         return "\n".join([mkr, lbl, mkr] + bdy + [mkr])
00795 

def DybPython::dbsrv::DB::_get_utables (   self) [private]

Definition at line 796 of file dbsrv.py.

00797                           :
00798         """
00799         """ 
00800         alltables = self._get_tables()
00801         if self.opts.tables is None:
00802              return alltables
00803         else:
             return filter(lambda t:t in alltables, self.opts.tables.split(","))
def DybPython::dbsrv::DB::show_create (   self,
  table 
)

Definition at line 806 of file dbsrv.py.

00807                                  :
00808         return self("show create table %(table)s" % locals())[0]['Create Table']

def DybPython::dbsrv::DB::_dumplocal_mkdir (   self,
  outdir 
) [private]
:param outdir:

Definition at line 809 of file dbsrv.py.

00810                                        :
00811         """
00812         :param outdir:
00813         """
00814         if not os.path.exists(outdir):
00815             os.makedirs(outdir)     # umask gets in the way of the mode argument here 
00816             os.chmod(outdir, 0777)  # needs to be writable by other so mysql user can write into it 
00817         assert os.path.isdir(outdir), outdir
        pass
def DybPython::dbsrv::DB::_dumplocal_schema (   self,
  outdir,
  table 
) [private]
:param outdir:
:param table:

Definition at line 818 of file dbsrv.py.

00819                                                :
00820         """
00821         :param outdir:
00822         :param table:
00823         """
00824         schema=os.path.join(outdir, "%s.schema" % table )
00825         sf = open(schema, "w")
00826         sf.write( self.show_create(table) )
00827         sf.close()
        pass
def DybPython::dbsrv::DB::_dumplocal_table (   self,
  outdir,
  table,
  where 
) [private]
:param outdir:
:param table:
:param where: 

Definition at line 828 of file dbsrv.py.

00829                                                      :
00830         """
00831         :param outdir:
00832         :param table:
00833         :param where: 
00834         """
00835         outfile=os.path.join(outdir, "%s.csv" % table )
00836         log.info("_dumplocal_table %s writing %s " % (where,outfile) ) 
00837         pass
00838 
00839         if table == 'LOCALSEQNO':
00840             where = "TRUE"
00841 
00842         if self.is_local:
00843             log.debug("IntoOutFile")
00844             if os.path.exists(outfile):
00845                 log.info("remove preexisting outfile %s " % outfile )
00846                 os.remove(outfile) 
00847             io = IntoOutfile(table=table,where=where, outfile=outfile)
00848             self(str(io))
00849         else:
00850             log.debug("RemoteIntoOutFile")
00851             t0 = time.time()
00852             tf = open(outfile,"w")
00853             rio = RemoteIntoOutfile(table=table, where=where)
00854             self._write_csvdirect(rio, tf )   ## result of select is returned to python and thence formatted directly into csv, works remotely 
00855             tf.close()
00856             t1 = time.time()
00857             self.lastseconds = t1 - t0
00858         pass
00859         log.info("partition_dumplocal___ %s writing %s took %s seconds " % (where,outfile, "%5.2f" % self.lastseconds ) ) 
00860         pass

def DybPython::dbsrv::DB::_write_csvdirect (   self,
  select,
  tf 
) [private]
Adopt low level approach to avoid unnecessary conversions into 
python types then back to string and the associated difficulties of 
then getting precisely the same as SELECT * INTO OUTFILE 

Note that use of `store_result` rather than `use_result` means 
that all rows are in memory at once.

NB for consistency the CSV ouput by this command MUST MATCH that 
by _write_outfile

`_write_csvdirect` is used by **rdumpcat** , this mimics 
the output from `_write_outfile` (used by **dumpcat**) with 
the big advantage that it works remotely, with no strong permission 
requirements

TODO:

#. when there is a pre-existing LOCALSEQNO redirect LOCALSEQNO to a temporay file 
   and do a merge...  easiest to instanciate them as AsciiCSV and then merge at that level 

Definition at line 861 of file dbsrv.py.

00862                                             :
00863         """
00864         Adopt low level approach to avoid unnecessary conversions into 
00865         python types then back to string and the associated difficulties of 
00866         then getting precisely the same as SELECT * INTO OUTFILE 
00867 
00868         Note that use of `store_result` rather than `use_result` means 
00869         that all rows are in memory at once.
00870 
00871         NB for consistency the CSV ouput by this command MUST MATCH that 
00872         by _write_outfile
00873 
00874         `_write_csvdirect` is used by **rdumpcat** , this mimics 
00875         the output from `_write_outfile` (used by **dumpcat**) with 
00876         the big advantage that it works remotely, with no strong permission 
00877         requirements
00878 
00879         TODO:
00880 
00881         #. when there is a pre-existing LOCALSEQNO redirect LOCALSEQNO to a temporay file 
00882            and do a merge...  easiest to instanciate them as AsciiCSV and then merge at that level 
00883 
00884         """
00885         q = str(select)
00886         log.debug("_write_csvdirect %s " % q) 
00887 
00888         llconn = self.llconn   
00889         llconn.query( q )
00890 
00891         lessmemory = True
00892         if lessmemory:
00893             log.debug("using `--LESSMEMORY` option : less memory expensive but more network expensive 'use_result'  ")
00894             result = llconn.use_result()
00895         else:
00896             log.debug("using more memory expensive but less network expensive 'store_result' ")
00897             result = llconn.store_result()
00898 
00899         csvf = CSVFormat( result.describe() )   
00900         for row in result.fetch_row(maxrows=0, how=0):   ## all rows as tuples
00901             tf.write( str(csvf) % tuple(row) +"\n" )
00902 

def DybPython::dbsrv::DB::timestamped_dir (   self,
  args 
)
Timestamping is needed for non-partitioned case 

Definition at line 903 of file dbsrv.py.

00904                                     :
00905         """
00906         Timestamping is needed for non-partitioned case 
00907         """
00908         bdir = self.determine_basedir(*args)
00909         odir = os.path.join(bdir, self.opts.timestamp )
00910         return odir

def DybPython::dbsrv::DB::dumplocal___ (   self,
  args,
  kwa 
)
:param outdir:  specifies output directory which must be writable by mysql user, it will be created if not existing 

Rerunning this will do quick checks of  the CSV files, looking at line counts 
and the first and last line and comparing with expections from DB queries. The quick
checks are done via commands:

* `wc` 
* `head -1` 
* `tail -1`

This is not called in the partitioned case.

Definition at line 911 of file dbsrv.py.

00912                                          :
00913         """
00914         :param outdir:  specifies output directory which must be writable by mysql user, it will be created if not existing 
00915 
00916         Rerunning this will do quick checks of  the CSV files, looking at line counts 
00917         and the first and last line and comparing with expections from DB queries. The quick
00918         checks are done via commands:
00919 
00920         * `wc` 
00921         * `head -1` 
00922         * `tail -1`
00923 
00924         This is not called in the partitioned case.
00925         """
00926         where = kwa.pop('where',None)
00927         if where is None:
00928             where = self.opts.where
00929 
00930         tables = kwa.pop('tables',None)
00931         if tables is None:
00932             tables = self.utables
00933 
00934         odir = self.timestamped_dir(*args)
00935         if os.path.exists(odir):
00936             log.info("dumplocal_ timestamped dir %s exists already, skipping dump" % odir ) 
00937         else:
00938             log.info("dumplocal_ into timestamped dir %s " % odir ) 
00939             self._dumplocal_mkdir(odir)
00940             for table in tables:
00941                 self._dumplocal_schema(odir, table)
00942                 self._dumplocal_table(odir, table, where)
00943             pass
00944         pass
00945         if self.opts.archive or self.opts.archiveforce:
00946             self.archive(odir)
00947 

def DybPython::dbsrv::DB::fields (   self,
  table,
  filter_ = None 
)

Definition at line 948 of file dbsrv.py.

00949                                          :
00950         return filter(filter_,map(lambda _:_['Field'],self("describe %(table)s" % locals())))

def DybPython::dbsrv::DB::range (   self,
  table,
  where = "1=1",
  keycount = True 
)

Definition at line 951 of file dbsrv.py.

00952                                                       :
00953         sql = self.partitionmgr.qrange(table, where, keycount=keycount)
00954         return self(sql)[0]

def DybPython::dbsrv::DB::minmax (   self,
  table,
  where = "1=1" 
)

Definition at line 955 of file dbsrv.py.

00956                                         :
00957         sql = self.partitionmgr.qminmax(table, where)
00958         return self(sql)[0]

def DybPython::dbsrv::DB::ptables (   self)
:return: list of tables with the key field

Definition at line 959 of file dbsrv.py.

00960                      :
00961         """
00962         :return: list of tables with the key field
00963         """
00964         key = self.partitionmgr.key 
00965         return filter(lambda t:key in self.fields(t), self.utables)
00966 

def DybPython::dbsrv::DB::tabminmax_csv (   self,
  path 
)

Definition at line 967 of file dbsrv.py.

00968                                  :
00969         kpo = self.partitionmgr.kpo
00970         head = os.popen("head -1 %(path)s" % locals()).read().strip()
00971         tail = os.popen("tail -1 %(path)s" % locals()).read().strip()
00972         table = os.path.basename(path[:-4]) 
00973         minkey = int(head.split(",")[kpo])
00974         maxkey = int(tail.split(",")[kpo])
00975         return table, minkey, maxkey 
00976 

def DybPython::dbsrv::DB::_wc_csv (   self,
  outdir,
  keycount = False 
) [private]
:param outdir: partition directory 
:return: dict of keyed by table name providing csv info, line count,  

::

    {'count': 10, 'keycount': 10, 'max': 40, 'min': 31}


#. getting the keycount in a general manner, 
   ie number of distinct key values would requiring 
   parsing the entire CSV so fake it from head and tail

::

    [blyth@belle7 ~]$ wc -l /tmp/pp/tmp_ligs_offline_db_0/1000/1/*.csv
    192000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannel.csv
    192000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannelStatus.csv
      1000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannelStatusVld.csv
      1000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannelVld.csv
    386000 total

Definition at line 977 of file dbsrv.py.

00977                                              : 
00978         """
00979         :param outdir: partition directory 
00980         :return: dict of keyed by table name providing csv info, line count,  
00981 
00982         ::
00983 
00984             {'count': 10, 'keycount': 10, 'max': 40, 'min': 31}
00985 
00986 
00987         #. getting the keycount in a general manner, 
00988            ie number of distinct key values would requiring 
00989            parsing the entire CSV so fake it from head and tail
00990 
00991         ::
00992 
00993             [blyth@belle7 ~]$ wc -l /tmp/pp/tmp_ligs_offline_db_0/1000/1/*.csv
00994             192000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannel.csv
00995             192000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannelStatus.csv
00996               1000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannelStatusVld.csv
00997               1000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannelVld.csv
00998             386000 total
00999         """
01000         if not os.path.exists(outdir):
01001             return None
01002 
01003         pipe = os.popen("wc -l %(outdir)s/*.csv" % locals())
01004         lines = pipe.readlines()
01005         rc = pipe.close()
01006         if rc is None:
01007             rc = 0 
01008         rc = os.WEXITSTATUS(rc) 
01009         log.debug("wc return code %(rc)s for %(outdir)s " % locals() ) 
01010 
01011         if rc != 0:
01012             log.warn("Problem with csv files in %(outdir)s wc rc %(rc)s " % locals() ) 
01013             return None
01014 
01015         wc = {}
01016         for n, path in map(lambda _:_.lstrip().rstrip().split(), lines):
01017             if path[-4:] != '.csv':continue
01018             pass
01019             table, minkey, maxkey = self.tabminmax_csv(path)
01020             wc[table] = dict(count=int(n),min=minkey,max=maxkey)
01021             if keycount:
01022                 wc[table]['keycount'] = maxkey - minkey + 1      # this is quick and dirty 
01023         return wc
01024 
def DybPython::dbsrv::DB::determine_basedir (   self,
  args 
)

Definition at line 1025 of file dbsrv.py.

01026                                       :
01027         """
01028         """
01029         if len(args)>0:
01030             basedir = args[0]
01031         else:
01032             basedir = self.backupdir  # eg /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7
01033         pass
01034         if basedir != self.backupdir:
01035             log.warn("using basedir %s different from standard %s " % (basedir,self.backupdir) ) 
01036         else:
01037             log.debug("using basedir %s " % basedir ) 
01038 
01039         return basedir
01040 

def DybPython::dbsrv::DB::assert_valid_dump (   self,
  pdir,
  csv,
  chk 
)

Definition at line 1041 of file dbsrv.py.

01042                                                 :
01043          assert csv == chk , ("prior dump %s check fail" % pdir, pformat(csv),pformat(chk))
01044          assert sorted(csv.keys()) == sorted(chk.keys()), ("tables mismatch", sorted(csv.keys()),  sorted(chk.keys()))
01045          for table in csv.keys():
01046              if chk[table].has_key('keycount'):
01047                  assert chk[table]['keycount'] == csv[table]['keycount'], (table, chk[table]['keycount'], csv[table]['keycount'] )
01048              assert chk[table]['count'] == csv[table]['count'], (table, chk[table]['count'], csv[table]['count'] )
01049              assert chk[table]['min'] == csv[table]['min'], (table, chk[table]['min'], csv[table]['min'] )
01050              assert chk[table]['max'] == csv[table]['max'], (table, chk[table]['min'], csv[table]['max'] )
01051 

def DybPython::dbsrv::DB::partition_dumpcheck (   self,
  pdir,
  pwhere,
  is_last,
  keycount = False 
)
 Checks a partition dump returning flag to signal a dump or not.

:param pdir:
:param pwhere:
:param is_last:
:param keycount: doing distinct keycount is quite slow, so can skip for pre-existing
:return: pdump, chk

Definition at line 1052 of file dbsrv.py.

01053                                                                          :
01054         """
01055          Checks a partition dump returning flag to signal a dump or not.
01056 
01057         :param pdir:
01058         :param pwhere:
01059         :param is_last:
01060         :param keycount: doing distinct keycount is quite slow, so can skip for pre-existing
01061         :return: pdump, chk
01062         """
01063         log.info("_")
01064         log.info("partition_dumpcheck partition loop %s " % pwhere)
01065         if is_last:
01066             assert pdir[-5:] == "/last"  , "unexpected pdir %s for last partition " % (pdir) 
01067 
01068         ptables = self.ptables()
01069         pdump = None 
01070 
01071         # DB look at SEQNO ranges
01072         chk = {}
01073         for table in ptables:
01074             chk[table] = self.range(table, pwhere, keycount=keycount)  
01075 
01076         # file system look at csv files 
01077         log.info("checking prior csv dump %s  --partitionsize %s --partitionrange %s  " % (pdir,self.opts.partitionsize, self.opts.partitionrange))
01078         csv = self._wc_csv(pdir, keycount=keycount)
01079         if csv is None:
01080             if os.path.exists(pdir):
01081                 msg = "Partition directory \"%(pdir)s\" exists but it contains no .csv files, delete the empty directory and rerun to clear this error" % locals()
01082                 log.fatal(msg)
01083                 raise Exception(msg) 
01084 
01085         # compare DB expectations to CSV dumps 
01086         if csv == chk:
01087             log.info("partition dump %s unchanged wrt DB check " % pdir )
01088             pdump = False
01089         else:
01090             pdump = True
01091             log.info("chk %s " % repr(chk))
01092             log.info("csv %s " % repr(csv))
01093             if is_last:
01094                 log.info("last partition dump %s and is changed wrt DB : will redump " % pdir )
01095             elif csv is None:
01096                 log.info("non last partition dump %s csv is None : first dump " % pdir )
01097             else:
01098                 log.fatal("non last partition dump %s and is changed wrt DB : will fail " % pdir )
01099                 self.assert_valid_dump( pdir, csv, chk)
01100             pass
01101         return pdump, chk 
01102 
 
def DybPython::dbsrv::DB::partition_dumplocal___ (   self,
  args,
  kwa 
)

Definition at line 1103 of file dbsrv.py.

01104                                                   :
01105         """
01106         """
01107         pm = self.partitionmgr 
01108         pm.basedir = self.determine_basedir(*args)
01109         if not os.path.exists(pm.basedir):
01110             os.makedirs(pm.basedir)   
01111 
01112         svy = pm.survey(self)
01113         ptables = self.ptables()
01114         assert len(ptables), ptables
01115 
01116         # write all table schema into single schema dir rather than repeating for every partition
01117         pdir = pm.dir("_")
01118         if os.path.exists(pdir):
01119             log.info("schema dir %s exists already" % pdir )
01120         else:
01121             log.info("creating and populating schema dir %s " % pdir )
01122             self._dumplocal_mkdir(pdir)
01123             for table in ptables:
01124                 self._dumplocal_schema(pdir, table)
01125             pass
01126         pass
01127         if self.opts.archive or self.opts.archiveforce:
01128             self.archive(pdir)
01129  
01130         for p in pm.parts:
01131             pdir = pm.dir(p)
01132             pmin = pm.min(p)     
01133             pmax = pm.max(p)
01134             pwhere = pm.where(p)
01135             is_last = pm.is_last(p)   # last partition expected to be incomplete
01136             keycount = not os.path.exists(pdir)     # only keycount for new partitions
01137             pdump, chk = self.partition_dumpcheck( pdir, pwhere , is_last, keycount=keycount )
01138             if pdump:
01139                 log.info("dumplocal partition %s %s %s:%s --partitionsize %s --partitionrange %s " % (p,self.opts.partitionkey,pmin,pmax,self.opts.partitionsize, self.opts.partitionrange)) 
01140                 self._dumplocal_mkdir(pdir)
01141                 for table in ptables:
01142                     if is_last:
01143                         log.warn("skipping completeness checks for last partition %s into %s " % (p, pdir))
01144                         self._dumplocal_table(pdir, table, pwhere)
01145                     elif chk[table]['min'] == pmin  and chk[table]['max'] == pmax:
01146                         self._dumplocal_table(pdir, table, pwhere)
01147                     else:
01148                         log.warn(" table %s check min  %s pmin %s " % ( table, chk[table]['min'], pmin ))
01149                         log.warn(" table %s check max  %s pmax %s " % ( table, chk[table]['max'], pmax ))
01150                         log.warn("skipping dump as check ahead query indicates incomplete partition %s " % repr(check)) 
01151                     pass
01152                 zdump, zchk  = self.partition_dumpcheck( pdir, pwhere, is_last, keycount=True )
01153                 assert zdump == False, ("post dump dumpcheck signals need to dump", pformat(zchk)) 
01154             pass
01155             if self.opts.archive: 
01156                 force = self.opts.archiveforce or pdump
01157                 self.archive(pdir, force=force)
01158         pass

def DybPython::dbsrv::DB::extract (   self,
  dir,
  base 
)
:param dir: directory to be created by extraction 
:param base: 

Definition at line 1159 of file dbsrv.py.

01160                                 :
01161         """
01162         :param dir: directory to be created by extraction 
01163         :param base: 
01164         """
01165         pass
01166         tgzp, rpath = self.archivepath(dir, base) 
01167         log.info("extract dir %s tgzp %s rpath %s " % (dir, tgzp, rpath))
01168         tgz = Tar(tgzp, toplevelname=rpath )
01169         tgz.examine()
01170         ls = tgz.list(tgz.members)
01171         tgz.digest(verify=True)
01172         tgz.extract( containerdir=base , toplevelname=rpath, dryrun=False )
01173 

def DybPython::dbsrv::DB::archivepath (   self,
  dir,
  base = None 
)
:param dir: directory to be archived or extracted into 
:return: path to archive tarball, dir path relative to base

Definition at line 1174 of file dbsrv.py.

01175                                           :
01176         """
01177         :param dir: directory to be archived or extracted into 
01178         :return: path to archive tarball, dir path relative to base
01179         """ 
01180         if base is None:
01181             base = self.backupdir  # eg /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7
01182         assert dir[0:len(base)] == base, (dir, base, "dir %s should be within base %s " % (dir,base))
01183         rpath = dir[len(base)+1:]
01184         elems = rpath.split("/")
01185         name = "_".join(elems)
01186         top = elems[0]         # typically this is chunk size
01187         tgzp = os.path.join(base,"archive", top, "%s.tar.gz" % name )   
01188         log.debug("archivepath,  dir %s base %s rpath %s name %s tgzp %s  " % ( dir, base, rpath, name, tgzp ) )
01189         return tgzp, rpath 
01190 

def DybPython::dbsrv::DB::archive (   self,
  dir,
  force = False 
)
:param dir: directory the contents of which should be archived 

As a partition corresponds to a certain SEQNO range, it never changes
so there is no need for a datestring in the path.

The configured backupfold needs to be created before using the archive `-a` option with::

    [blyth@belle7 DybPython]$ sudo mkdir /var/dbbackup/dbsrv
    [blyth@belle7 DybPython]$ sudo chown -R blyth.blyth /var/dbbackup/dbsrv/

Definition at line 1191 of file dbsrv.py.

01192                                         :
01193         """
01194         :param dir: directory the contents of which should be archived 
01195 
01196         As a partition corresponds to a certain SEQNO range, it never changes
01197         so there is no need for a datestring in the path.
01198 
01199         The configured backupfold needs to be created before using the archive `-a` option with::
01200 
01201             [blyth@belle7 DybPython]$ sudo mkdir /var/dbbackup/dbsrv
01202             [blyth@belle7 DybPython]$ sudo chown -R blyth.blyth /var/dbbackup/dbsrv/
01203 
01204         """
01205         base = self.backupdir  # eg /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7
01206         tgzp, rpath = self.archivepath(dir) 
01207         tgzd = os.path.dirname(tgzp)
01208         if not os.path.exists(tgzd):
01209             os.makedirs(tgzd)
01210         pass
01211 
01212         if self.opts.transfer:
01213             cfg = self.opts.transfercfg.split(":")
01214             if len(cfg) == 2:
01215                 kwa = dict(remotenode=cfg[0], remoteprefix=cfg[1])
01216             elif len(cfg) == 1:
01217                 kwa = dict(remotenode=cfg[0])
01218             else:
01219                 assert 0, "unexpected transfercfg %s " % repr(cfg)
01220             log.debug("using transfercfg : %s " % repr(kwa))
01221         else:
01222             kwa = {} 
01223        
01224         tgz = Tar(tgzp, toplevelname=rpath, **kwa)
01225         if os.path.exists(tgzp) and not force:
01226             log.info("archive already exists %s rerun with `-A/--archiveforce` option to recreate " % tgzp )
01227         else:
01228             log.info("creating archive %s for %s " % (tgzp,rpath) )
01229             tgz.archive( base )  # argument specifies the root directory of the archive
01230         pass
01231         tgz.examine()
01232         #log.info("\n".join(tgz.names))
01233         ls = tgz.list(tgz.members)
01234         log.debug("\n"+ls)
01235         du = os.popen("du -hs %(tgzp)s" % locals()).read().strip()
01236         log.info(du)
01237         tgz.digest()
01238 
01239         if self.opts.transfer:
01240             tgz.transfer()
01241 

def DybPython::dbsrv::DB::partition_loadlocal___ (   self,
  args,
  kwa 
)
#. look into putting the partitions back togther again, in partitioned load local
#. read file system tealeaves wrt the partitioning

#. factor off the checking 
#. need to work out which partitions are new and just load those         

Definition at line 1242 of file dbsrv.py.

01243                                                    :
01244         """
01245         #. look into putting the partitions back togther again, in partitioned load local
01246         #. read file system tealeaves wrt the partitioning
01247 
01248         #. factor off the checking 
01249         #. need to work out which partitions are new and just load those         
01250         """
01251         basedir = self.determine_basedir(*args)
01252         log.info("basedir %s " % basedir )
01253         pm = self.partitionmgr 
01254         pm.basedir = basedir
01255 
01256         if self.opts.extract:
01257             chunks = pm.archived_chunks()
01258             log.info("archived_chunks %s " % repr(chunks) )
01259             pm.assign_parts( chunks )
01260         else:
01261             chunks = pm.available_chunks()  # only works after extraction
01262             log.info("available_chunks %s " % repr(chunks) )
01263 
01264         _dir = pm.dir("_")
01265         if not os.path.exists(_dir):
01266             log.info("schema dir %s does not exists" % _dir )
01267             if self.opts.extract:
01268                 self.extract(_dir, basedir)
01269             pass
01270         pass
01271         assert os.path.exists(_dir), _dir 
01272         #_tables = map(lambda _:_[:-7],filter(lambda _:_[-7:] == '.schema',os.listdir(_dir)))
01273 
01274         for p in pm.parts:
01275             pdir = pm.dir(p)
01276             pnam = os.path.basename(pdir)
01277             pmin = pm.min(p) 
01278             pmax = pm.max(p) 
01279             pwhere = pm.where(p) 
01280             #log.debug("p %s pdir %s pmin %s pmax %s pwhere %s " % (p,pdir,pmin,pmax,pwhere))
01281             if not os.path.exists(pdir):
01282                 log.debug("partition_loadlocal___ partition dir %s does not exist " % pdir )
01283                 if self.opts.extract:
01284                     self.extract(pdir, basedir)
01285 
01286             if not os.path.exists(pdir):
01287                 log.debug("partition_loadlocal___ partition dir %s STILL does not exist " % pdir ) 
01288             else:
01289                 log.info("partition_loadlocal___ loading %s " % pdir ) 
01290                 ltables = self.loadlocal_dir( pdir )
01291                 log.debug("ltables %s " % str(ltables))
01292 
01293                 # check that the loaded partition yields the expected key range and count
01294                 if not self.opts.nocheck:
01295                     check = {}
01296                     for table in ltables:
01297                         check[table] = self.minmax(table, pwhere)
01298                         assert check[table]['min'] == pmin, (table,"min",check[table]['min'],pmin)
01299                         if pnam != 'last':
01300                             assert check[table]['max'] == pmax, (table,"max",check[table]['max'],pmax)
01301                         # keycount check is too slow
01302                         #assert check[table]['keycount'] == pm.size, (table,"keycount",check[table]['keycount'],pm.size)
01303                         pass 
01304                     log.debug(pformat(check))
01305                 pass
01306 

def DybPython::dbsrv::DB::loadlocal_dir (   self,
  dir 
)

Definition at line 1307 of file dbsrv.py.

01308                                  :
01309         """
01310         """
01311         # if a sibling dir named "_" exists use that as a source of schema files, otherwise get directly
01312         _dir = os.path.join(os.path.dirname(dir),"_")
01313         if not os.path.exists(_dir):
01314             _dir = dir 
01315 
01316         def _replace_ignore(table):
01317             """
01318             hmm DBI specificity slipping in
01319             """
01320             if table == "LOCALSEQNO": 
01321                 return "REPLACE" 
01322             else:
01323                 return "IGNORE"    
01324 
01325         utables = self.utables
01326 
01327         log.info("utables %s " % repr(utables))
01328 
01329         ltables = [] 
01330         for name in filter(lambda _:os.path.isfile(os.path.join(dir,_)) and _[-4:] == '.csv' , os.listdir(dir)):
01331             path = os.path.join(dir, name)
01332             table, ext = os.path.splitext(name)
01333             if not table in utables:
01334                 schema = os.path.join(_dir, "%s.schema" % table )
01335                 log.warn("creating table %s from schema file %s " % (table, schema))
01336                 assert os.path.exists(schema), schema
01337                 self(open(schema,"r").read())
01338             pass
01339             ltables.append(table)
01340 
01341             ttable, csvminkey, csvmaxkey = self.tabminmax_csv(path)
01342             assert csvminkey < csvmaxkey , (csvminkey,csvmaxkey)
01343             assert ttable == table, (ttable, table)
01344             mm = self.minmax(table)            
01345             log.debug("csvminkey %s csvmaxkey %s " % (csvminkey, csvmaxkey))
01346             log.debug(" mmmin %(min)s  mmmax %(max)s " % mm )
01347             if csvmaxkey <= mm['max']:
01348                 log.info("SKIP: as already loaded csv keys minmax %s %s from %s " % (csvminkey,csvmaxkey,path ))
01349             else:
01350                 ll = LoadDataLocalInfile(infile=path, table=table, ignorelines=0, replace_ignore=_replace_ignore(table) )
01351                 self(str(ll))        
01352                 log.info("loadlocal ingesting %s took %s seconds " % (path, "%5.2f" % self.lastseconds ) ) 
01353         pass 
01354         return ltables
01355  

def DybPython::dbsrv::DB::loadlocal___ (   self,
  args,
  kwa 
)
:param outdir:  specifies directory containing normal or partitioned dump of CSV files

Definition at line 1356 of file dbsrv.py.

01357                                          :
01358         """
01359         :param outdir:  specifies directory containing normal or partitioned dump of CSV files
01360         """
01361         odir = self.timestamped_dir(*args)
01362         return self.loadlocal_dir(odir)
01363   
01364 


Member Data Documentation

tuple DybPython::dbsrv::DB::docs = classmethod(docs) [static]

Definition at line 672 of file dbsrv.py.

Definition at line 584 of file dbsrv.py.

Definition at line 584 of file dbsrv.py.

Definition at line 584 of file dbsrv.py.

Definition at line 584 of file dbsrv.py.

Definition at line 584 of file dbsrv.py.

Definition at line 584 of file dbsrv.py.

Definition at line 584 of file dbsrv.py.

Definition at line 584 of file dbsrv.py.

Definition at line 584 of file dbsrv.py.

Definition at line 584 of file dbsrv.py.

Definition at line 584 of file dbsrv.py.

Definition at line 680 of file dbsrv.py.

result of select is returned to python and thence formatted directly into csv, works remotely

Definition at line 731 of file dbsrv.py.


Property Documentation

DybPython::dbsrv::DB::size = property(_get_size, doc="Size estimate of the DB in MB ") [static]

Definition at line 694 of file dbsrv.py.

DybPython::dbsrv::DB::databases = property(_get_databases, doc="List of database names obtained from information_schema.tables") [static]

Definition at line 703 of file dbsrv.py.

DybPython::dbsrv::DB::tables = property(_get_tables_infoschema, doc="List of table names obtained from information_schema.tables") [static]

Definition at line 711 of file dbsrv.py.

DybPython::dbsrv::DB::datadir = property(_get_datadir, doc="Query DB server to find the datadir, eg /var/lib/mysql/ OR /data/mysql/ ") [static]

Definition at line 729 of file dbsrv.py.

DybPython::dbsrv::DB::utables = property(_get_utables, doc="List of tables to use in operations, when --tables option is used this can be a subset of all tables.") [static]

Definition at line 804 of file dbsrv.py.


The documentation for this class was generated from the following file:
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 09:55:41 for DybPython by doxygen 1.7.4