/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

dbsrv.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 """
00003 dbsrv : MySQL Server Utilities
00004 ================================
00005 
00006 A more admin centric version of sibling *db.py* with advanced features, including:
00007 
00008 * on server optimizations such as `select ... into outfile`
00009   taking advantage of the situation when the mysql client and
00010   server are on the same node.
00011 
00012 * partitioned dump/load for dealing with very large tables and incremental backups
00013 
00014 * implicit DB addressing without a *~/.my.cnf* section allowing handling of multiple databases all 
00015   from the same server via comma delimited names or regular expressions
00016 
00017 * despite coming from NuWa it does not need the NuWa environment, system python with MySQLdb is OK
00018 
00019 TODO
00020 -----
00021 
00022 #. checking the digests on the target and sending notification emails
00023 
00024 #. test dumplocal when partitionsize is an exact factor of table size
00025 #. warnings or asserts when using partitioned dumplocal with disparate table sizes 
00026 
00027 
00028 Usage
00029 ------
00030 
00031 ::
00032 
00033    ./dbsrv.py tmp_ligs_offline_db_0 databases
00034    ./dbsrv.py tmp_ligs_offline_db_0 tables
00035    ./dbsrv.py tmp_ligs_offline_db_0 dumplocal  --where "SEQNO < 100" 
00036  
00037 Similar to *db.py* the first argument can be a *~/.my.cnf* section name.
00038 Differently to *db.py* it can also simply be a database name which
00039 does not have a corresponding config section.  
00040 
00041 In this implicit case the other connection pararameters are obtained 
00042 from the so called *home* section.  Normally the *home* section is "loopback" 
00043 indicating an on server connection.
00044 The *home* section must point to the `information_schema` database. 
00045 
00046 When the `--home` option is used databases on remote servers can
00047 be accessed without having config sections for them all.
00048 
00049 Comparing DB via partitioned dump
00050 -----------------------------------
00051 
00052 Three table dumps skipping the crashed table in order to compare:
00053 
00054 * `dybdb1_ligs.tmp_ligs_offline_db_dybdb1`  original on dybdb1
00055 * `dybdb2_ligs.channelquality_db_dybdb2`    recovered on dybdb2
00056 * `loopback.channelquality_db_belle7`   recovered onto belle7 from hotcopy created on belle1
00057 
00058 Invoked from cron for definiteness, and ability to leave running for a long time::
00059 
00060     07 17 * * * ( $DYBPYTHON_DIR/dbsrv.py -t DqChannel,DqChannelVld,DqChannelStatusVld --home dybdb1_ligs tmp_ligs_offline_db_dybdb1 dumplocal /tmp/cq/tmp_ligs_offline_db_dybdb1  --partition --partitioncfg 10000,0,33 ) >  $CRONLOG_DIR/dbsrv_dump_tmp_ligs_offline_db_dybdb1.log 2>&1 
00061     52 18 * * * ( $DYBPYTHON_DIR/dbsrv.py -t DqChannel,DqChannelVld,DqChannelStatusVld --home dybdb2_ligs channelquality_db_dybdb2 dumplocal /tmp/cq/channelquality_db_dybdb2 --partition --partitioncfg 10000,0,33 ) > $CRONLOG_DIR/dbsrv_dump_channelquality_db_dybdb2.log 2>&1 
00062     28 20 * * * ( $DYBPYTHON_DIR/dbsrv.py -t DqChannel,DqChannelVld,DqChannelStatusVld --home loopback channelquality_db_belle7 dumplocal /tmp/cq/channelquality_db_belle7 --partition --partitioncfg 10000,0,33 ) > $CRONLOG_DIR/dbsrv_dump_channelquality_db_belle7.log 2>&1 
00063 
00064 .. warning:: `--partitioncfg` has now been split into `--partitionsize` and `--partitionrange`
00065 
00066 
00067 Dump speed:
00068 
00069 #. remote dumps from dybdb1/dybdb2 to belle7 take approx 165s for each chunk. Thus ~90min for all. 
00070 #. local dumps on belle7 take approx 20s for each chunk. Thus ~11min for all.
00071 
00072 diffing the dumped partitions
00073 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00074 
00075 For the first two all but the partial chunk match.  
00076 
00077 Range of partition dirs to diff controlled by envvar::
00078 
00079     [blyth@belle7 DybPython]$ RANGE=0,10 ./diff.py  /tmp/cq/tmp_ligs_offline_db_dybdb1/10000 /tmp/cq/channelquality_db_dybdb2/10000   
00080     [blyth@belle7 DybPython]$ RANGE=10,20 ./diff.py  /tmp/cq/tmp_ligs_offline_db_dybdb1/10000 /tmp/cq/channelquality_db_dybdb2/10000   
00081     [blyth@belle7 DybPython]$ RANGE=20,30 ./diff.py  /tmp/cq/tmp_ligs_offline_db_dybdb1/10000 /tmp/cq/channelquality_db_dybdb2/10000   
00082     [blyth@belle7 DybPython]$ RANGE=30,33 ./diff.py  /tmp/cq/tmp_ligs_offline_db_dybdb1/10000 /tmp/cq/channelquality_db_dybdb2/10000      ## see diff.py for the output from these
00083 
00084     [blyth@belle7 DybPython]$ RANGE=0,10 ./diff.py  /tmp/cq/channelquality_db_belle7/10000 /tmp/cq/channelquality_db_dybdb2/10000   
00085     [blyth@belle7 DybPython]$ RANGE=10,20 ./diff.py  /tmp/cq/channelquality_db_belle7/10000 /tmp/cq/channelquality_db_dybdb2/10000   
00086     [blyth@belle7 DybPython]$ RANGE=20,30 ./diff.py  /tmp/cq/channelquality_db_belle7/10000 /tmp/cq/channelquality_db_dybdb2/10000   
00087     [blyth@belle7 DybPython]$ RANGE=30,33 ./diff.py  /tmp/cq/channelquality_db_belle7/10000 /tmp/cq/channelquality_db_dybdb2/10000   
00088 
00089 oops a difference, but its just different formatting of 0.0001 or 1e-04
00090 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
00091 
00092 ::
00093 
00094     [blyth@belle7 DybPython]$  RANGE=10,20 ./diff.py  /tmp/cq/channelquality_db_belle7/10000 /tmp/cq/channelquality_db_dybdb2/10000   
00095     2013-06-07 17:58:06,933 __main__ INFO     rng ['10', '11', '12', '13', '14', '15', '16', '17', '18', '19'] 
00096     2013-06-07 17:58:26,526 __main__ INFO     diff -r --brief /tmp/cq/channelquality_db_belle7/10000/10 /tmp/cq/channelquality_db_dybdb2/10000/10 => 0 
00097     2013-06-07 17:58:44,896 __main__ INFO     diff -r --brief /tmp/cq/channelquality_db_belle7/10000/11 /tmp/cq/channelquality_db_dybdb2/10000/11 => 0 
00098     2013-06-07 17:59:04,360 __main__ INFO     diff -r --brief /tmp/cq/channelquality_db_belle7/10000/12 /tmp/cq/channelquality_db_dybdb2/10000/12 => 0 
00099     2013-06-07 17:59:22,531 __main__ INFO     diff -r --brief /tmp/cq/channelquality_db_belle7/10000/13 /tmp/cq/channelquality_db_dybdb2/10000/13 => 0 
00100     2013-06-07 17:59:42,205 __main__ INFO     diff -r --brief /tmp/cq/channelquality_db_belle7/10000/14 /tmp/cq/channelquality_db_dybdb2/10000/14 => 0 
00101     2013-06-07 18:00:00,385 __main__ INFO     diff -r --brief /tmp/cq/channelquality_db_belle7/10000/15 /tmp/cq/channelquality_db_dybdb2/10000/15 => 0 
00102     2013-06-07 18:00:20,000 __main__ INFO     diff -r --brief /tmp/cq/channelquality_db_belle7/10000/16 /tmp/cq/channelquality_db_dybdb2/10000/16 => 0 
00103     2013-06-07 18:00:38,198 __main__ INFO     diff -r --brief /tmp/cq/channelquality_db_belle7/10000/17 /tmp/cq/channelquality_db_dybdb2/10000/17 => 0 
00104     2013-06-07 18:00:38,704 __main__ INFO     diff -r --brief /tmp/cq/channelquality_db_belle7/10000/18 /tmp/cq/channelquality_db_dybdb2/10000/18 => 1
00105     Files /tmp/cq/channelquality_db_belle7/10000/18/DqChannel.csv and /tmp/cq/channelquality_db_dybdb2/10000/18/DqChannel.csv differ
00106      
00107     2013-06-07 18:00:56,602 __main__ INFO     diff -r --brief /tmp/cq/channelquality_db_belle7/10000/19 /tmp/cq/channelquality_db_dybdb2/10000/19 => 0 
00108     [blyth@belle7 DybPython]$ 
00109     [blyth@belle7 DybPython]$ 
00110     [blyth@belle7 DybPython]$ 
00111     [blyth@belle7 DybPython]$ diff /tmp/cq/channelquality_db_belle7/10000/18/DqChannel.csv  /tmp/cq/channelquality_db_dybdb2/10000/18/DqChannel.csv 
00112     1196930c1196930
00113     < 186235,2,28473,7,67175938,0.0001,7.35714,3.39868,-1,-1
00114     ---
00115     > 186235,2,28473,7,67175938,1e-04,7.35714,3.39868,-1,-1
00116     ...
00117 
00118 Commands
00119 -----------
00120 
00121 summary
00122 ~~~~~~~~~
00123 
00124 Providea a summary of table counts and update times in all selected databases.
00125 The DB names are specified by comma delimited OR Regexp string arguments specifying the DB names. 
00126 ::
00127 
00128     ./dbsrv.py               tmp_ligs_offline_db_\\d summary           
00129 
00130                # local home, requires "loopback" config section pointing to information_schema DB
00131 
00132     ./dbsrv.py --home dybdb1 tmp_\\S* summary                          
00133 
00134                # remote home,  requires "dybdb1" config section pointing to information_schema DB
00135 
00136 TODO:
00137 
00138 Check handling of section names the same as DB names on different nodes, as the section config will trump the dbname ? 
00139 BUT *home* config host matching should trip asserts ? 
00140 
00141 
00142 dumplocal
00143 ~~~~~~~~~
00144 
00145 The DB tables are dumped as *.csv* files and separate *.schema* files containing table creation SQL.
00146 Without a directory argument the dumps are writes beneath the `--backupfold` controllable directory, such as `/var/dbbackup/dbsrv`
00147 
00148 ::
00149 
00150     [blyth@belle7 DybPython]$ ./dbsrv.py  tmp_ligs_offline_db_0 dumplocal --where 'SEQNO <= 100'    
00151     2013-06-13 16:49:38,152 __main__ INFO     partition_dumplocal___ SEQNO <= 100 writing /var/dbbackup/dbsrv/belle7.nuu.edu.tw/tmp_ligs_offline_db_0/DqChannel.csv 
00152     2013-06-13 16:49:38,578 __main__ INFO     partition_dumplocal___ SEQNO <= 100 writing /var/dbbackup/dbsrv/belle7.nuu.edu.tw/tmp_ligs_offline_db_0/DqChannel.csv took  0.39 seconds 
00153     ...
00154 
00155     [blyth@belle7 DybPython]$ ./dbsrv.py  tmp_ligs_offline_db_0 dumplocal /tmp/check/tmp_ligs_offline_db_0 --where 'SEQNO <= 100' 
00156     2013-06-13 16:50:49,003 __main__ WARNING  using basedir /tmp/check/tmp_ligs_offline_db_0 different from standard /var/dbbackup/dbsrv/belle7.nuu.edu.tw/tmp_ligs_offline_db_0 
00157     2013-06-13 16:50:49,031 __main__ INFO     partition_dumplocal___ SEQNO <= 100 writing /tmp/check/tmp_ligs_offline_db_0/DqChannel.csv 
00158     2013-06-13 16:50:49,203 __main__ INFO     partition_dumplocal___ SEQNO <= 100 writing /tmp/check/tmp_ligs_offline_db_0/DqChannel.csv took  0.17 seconds 
00159     ...  
00160 
00161 .. warning:: When there are databases of the same name on multiple nodes it is useful to include the names of the node in the section name
00162 
00163 
00164 loadlocal
00165 ~~~~~~~~~~~
00166 
00167 When doing a load into a database to be created use `--DB_DROP_CREATE` option::
00168 
00169     [blyth@belle7 DybPython]$ ./dbsrv.py tmp_ligs_offline_db_5 loadlocal ~/tmp_ligs_offline_db_0  -l debug --DB_DROP_CREATE 
00170 
00171 Typically when loading a database name change in needed, in this case the directory and new section name must be given::
00172 
00173     [blyth@belle7 DybPython]$ ./dbsrv.py tmp_ligs_offline_db_50 loadlocal /var/dbbackup/dbsrv/belle7.nuu.edu.tw/tmp_ligs_offline_db_0 --DB_DROP_CREATE  
00174     DROP and reCREATE database tmp_ligs_offline_db_50 loosing all tables contained ? Enter "YES" to proceed : YES
00175     2013-06-13 16:58:41,491 __main__ WARNING  using basedir /var/dbbackup/dbsrv/belle7.nuu.edu.tw/tmp_ligs_offline_db_0 different from standard /var/dbbackup/dbsrv/belle7.nuu.edu.tw/tmp_ligs_offline_db_50 
00176     2013-06-13 16:58:41,499 __main__ WARNING  creating table DqChannel from schema file /var/dbbackup/dbsrv/belle7.nuu.edu.tw/tmp_ligs_offline_db_0/DqChannel.schema 
00177     ...
00178 
00179 
00180 partitioned loadlocal
00181 ~~~~~~~~~~~~~~~~~~~~~~~~
00182 
00183 NB when restoring need to do a name change, so it is neccesary to specify the source directory as an argument  
00184 ::
00185 
00186      [root@cms01 DybPython]# dbsrv channelquality_db_restored loadlocal /data/var/dbbackup/dbsrv/dybdb2.ihep.ac.cn/channelquality_db_dybdb2 --partition --extract -l debug --DB_DROP_CREATE  -C
00187                              ##  initial run, creating the DB  from 32 partitions took ~100 min
00188   
00189      [root@cms01 DybPython]# dbsrv channelquality_db_restored loadlocal /data/var/dbbackup/dbsrv/dybdb2.ihep.ac.cn/channelquality_db_dybdb2 --partition --extract  -K
00190                              ##  quick re-run, notices nothing to do and completes in a few seconds
00191 
00192      [blyth@cms01 ~]$ type dbsrv    # function to nab the NuWa python MySQLdb, as yum is being uncooperative on cms01
00193      dbsrv is a function
00194      dbsrv () 
00195      { 
00196         local python=/data/env/local/dyb/trunk/external/Python/2.7/i686-slc4-gcc34-dbg/bin/python;
00197         export PYTHONPATH=/data/env/local/dyb/trunk/NuWa-trunk/../external/mysql_python/1.2.3_mysql5.0.67_python2.7/i686-slc4-gcc34-dbg/lib/python2.7/site-packages;
00198         LD_LIBRARY_PATH=/data/env/local/dyb/trunk/NuWa-trunk/../external/mysql/5.0.67/i686-slc4-gcc34-dbg/lib/mysql:$LD_LIBRARY_PATH;
00199         LD_LIBRARY_PATH=/data/env/local/dyb/trunk/NuWa-trunk/../external/mysql_python/1.2.3_mysql5.0.67_python2.7/i686-slc4-gcc34-dbg/lib:$LD_LIBRARY_PATH;
00200         export LD_LIBRARY_PATH;
00201         $python -c "import MySQLdb";
00202         $python ~blyth/DybPython/dbsrv.py $*
00203      }
00204 
00205 Test run on cms01 chugging along at ~3 min per 10k partition, at 32 partitions estimate ~100 min to complete
00206 
00207 ::
00208 
00209       [blyth@belle7 DybPython]$ ./dbsrv.py  channelquality_db_restored loadlocal  /var/dbbackup/dbsrv/dybdb2.ihep.ac.cn/channelquality_db_dybdb2 --partition --extract -l debug --DB_DROP_CREATE  -C
00210 
00211 
00212 
00213 Partitioned Commands
00214 ----------------------
00215 
00216 The partitioning relies on these options:
00217 
00218 `--partition`
00219           switches on partitioning, default False
00220 `--partitionkey`
00221           default "SEQNO,0", corresponding to the key name and its position in CSV dumps
00222 `--partitioncfg`
00223           **NOW RPLACED WITH THE BELOW TWO OPTIONS**
00224           default "10000,0,33", the three integers specify the number of keys in each chunk `10000` 
00225           and the range of chunks `range(0,33)` ie 0 to 32
00226 `--partitionsize`
00227           default "10000", specify the number of keys in each chunk
00228 `--partitionrange`
00229           default of None, meaning all partitions. 
00230           If specified as eg "0,33" it restricts to a range of partition indices `range(0,33)`
00231 --partitionlast`
00232           **NOW DEPRECATED** This the last partition is now auto determined, to allow daily cron running 
00233           default None, when set to an integer string eg "32" this is used to identifiy the index 
00234           of the last incomplete partition
00235 
00236 For dump and load to refer to the same partition set, requires the same chunk size 
00237 (and partition key although this is not checked).
00238 
00239 partitioned loadlocal
00240 ~~~~~~~~~~~~~~~~~~~~~~~
00241 
00242 From cron::
00243 
00244     DYBPYTHON_DIR=/data1/env/local/dyb/NuWa-trunk/dybgaudi/DybPython/python/DybPython
00245     03 20 * * * ( $DYBPYTHON_DIR/dbsrv.py channelquality_db_0 loadlocal  /tmp/cq/channelquality_db --partition --DB_DROP_CREATE -C ) > $CRONLOG_DIR/dbsrv_load_.log 2>&1
00246 
00247 quick partitioning test
00248 ~~~~~~~~~~~~~~~~~~~~~~~~
00249 
00250 For fast dump/load testing use small chunks and range of partitions::
00251 
00252     ./dbsrv.py tmp_ligs_offline_db_0 dumplocal /tmp/pp/tmp_ligs_offline_db_0 --partition --partitionsize 10 --partitionrange 0,5
00253     ./dbsrv.py tmp_ligs_offline_db_5 loadlocal /tmp/pp/tmp_ligs_offline_db_0 --partition --partitionsize 10 --partitionrange 0,5 --DB_DROP_CREATE -C 
00254 
00255 
00256 Archiving and transfers to remote node
00257 -----------------------------------------
00258 
00259 Controlled via options:
00260 
00261 `-a/--archive`
00262           switch on archive creation 
00263 
00264 `-x/--extract`
00265           switch on archive extraction 
00266 
00267 `--backupfold`
00268           default /var/dbbackup/dbsrv, the location of backup dumps and tarballs
00269 
00270 `-T/--transfer`
00271           switch on remote transfer of archives, must be used together with `-a/--archive` and the *dumplocal* command to be effective
00272    
00273 `--transfercfg`
00274           configures the remote node and possible a directory prefix, that is prepended infront of the `backupfold`
00275 
00276 
00277 For example the below command dumps partitions 0,1 and 2, creates archive tarballs and transfers them to the remote node configured::
00278 
00279      ./dbsrv.py -t DqChannel,DqChannelVld,DqChannelStatusVld --home loopback channelquality_db_belle7 dumplocal /tmp/cq/channelquality_db_belle7 --partition --partitionrange 0,3 -aT
00280 
00281 The local and remote tarball paths are the same, with no transfercfg prefix specified, namely::
00282 
00283      /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7/10000_0.tar.gz 
00284 
00285 
00286 Transfer Optimization 
00287 ~~~~~~~~~~~~~~~~~~~~~~~
00288 
00289 A small .dna sidecar to the tarballs is used for tarball content identification.
00290 When a rerun of the transfer is made, the sidecar DNA is first checked to see
00291 if the remote node already holds the tarball.
00292 
00293 This means that only newly reached partitions are archived and transferred.  
00294 The `last` incomplete partition will typically be transferred every time as it will
00295 have a different content causing the DNA mismatch to trigger a re-transfer.
00296 
00297 
00298 Full archive/transfer cron test from belle7 to belle1 
00299 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00300 
00301 To prepare the remote node just need to create and set ownership of `backupfold` eg  `/var/dbbackup/dbsrv` 
00302 and ensure keyed ssh access is working
00303 
00304 ::
00305 
00306     DYBPYTHON_DIR=/data1/env/local/dyb/NuWa-trunk/dybgaudi/DybPython/python/DybPython
00307     DBSRV_REMOTE_NODE=N1
00308     35 18 * * * ( $DYBPYTHON_DIR/dbsrv.py -t DqChannel,DqChannelVld,DqChannelStatusVld --home loopback channelquality_db_belle7 dumplocal --partition --archive --transfer ) > $CRONLOG_DIR/dbsrv_pat_channelquality_db_belle7.log 2>&1  
00309 
00310 
00311 Installation on dybdb2
00312 ------------------------
00313 
00314 Prepare target node 
00315 ~~~~~~~~~~~~~~~~~~~~~
00316 
00317 The administrator of target node needs to prepare a folder for the archives::
00318 
00319     [blyth@cms01 ~]$ sudo mkdir /data/var/dbbackup/dbsrv
00320     [blyth@cms01 ~]$ sudo chown -R dayabayscp.dayabayscp /data/var/dbbackup/dbsrv
00321 
00322 Setup mysql config at source
00323 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00324 
00325 The config file :file:`~/.my.cnf` needs two sections "loopback" and "channelquality_db_dybdb2"::
00326 
00327     [loopback]
00328     host      = 127.0.0.1
00329     database  = information_schema
00330     user      = root
00331     password  = ***
00332 
00333     [channelquality_db_dybdb2]
00334     host      = 127.0.0.1
00335     database  = channelquality_db
00336     user      = root
00337     password  = ***
00338 
00339 SSH environment configuration
00340 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00341 
00342 The script runs *scp* commands internally that require:
00343 
00344 * `ssh-agent` process to be running and authenticated 
00345 * public keys of source node to be appended to `.ssh/authorized_keys2` of target 
00346 * :envvar:`SSH_AUTH_SOCK` to be defined.  
00347 
00348 When run from cron the envvar is typically not present.  
00349 In order to define this the :file:`~/.ssh-agent-info-$NODE_TAG` 
00350 is parsed by the `sshenv()` from common.py.
00351 
00352 This file is created by the **env** function `ssh--agent-start` which is used 
00353 following reboots to start and authenticate the ssh agent process. 
00354 
00355 * http://belle7.nuu.edu.tw/e/base/ssh/
00356 
00357 Get DybPython from dybsvn
00358 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
00359 
00360 ::
00361 
00362     cd 
00363     svn co http://dayabay.ihep.ac.cn/svn/dybsvn/dybgaudi/trunk/DybPython/python/DybPython
00364 
00365 Despite coming from dybsvn the *dbsrv.py* script does not need the NuWa environment. Just
00366 the MySQLdb extension in the system python should be adequate.
00367 
00368 
00369 Quick Interactive Test
00370 ~~~~~~~~~~~~~~~~~~~~~~~
00371 
00372 Configuring 5 small 100 SEQNO partitions allows the machinery to be quickly tested::
00373 
00374     cd DybPython
00375     ./dbsrv.py channelquality_db_dybdb2 dumplocal --partition --partitionsize 100 --partitionrange 0,5 --archive --transfer  
00376 
00377 
00378 CRON commandline
00379 ~~~~~~~~~~~~~~~~~~
00380 
00381 ::
00382 
00383     ## NB no DBSRV_REMOTE_NODE is needed, the default of S:/data is appropriate
00384     DYBPYTHON_DIR=/root/DybPython
00385     CRONLOG_DIR=/root/cronlog
00386     NODE_TAG=D2
00387     #
00388     42 13 * * * ( $DYBPYTHON_DIR/dbsrv.py channelquality_db_dybdb2 dumplocal --partition --archive --transfer ) > $CRONLOG_DIR/dbsrv_channelquality_db_dybdb2.log 2>&1 
00389 
00390 
00391 A do-nothing run, when there are no new partitions to dump/archive/transfer takes about 4 mins and uses little resources.
00392 When there are new completed partitions to archive and transfer, the default chunk size of 10000 SEQNO leads to tarballs 
00393 of only 35M (maybe 70M when move for all 4 tables) resulting in rapid transfers.
00394 
00395 Although new completed partitions might be reached perhaps every ~10 days with the 10k chunks, a daily 
00396 transfer is still recommended in order to backup the last incomplete partition and also in order that 
00397 issues with the transfer are rapidly identified and resolved.
00398 
00399 
00400 
00401 Transfer Monitoring
00402 ~~~~~~~~~~~~~~~~~~~
00403 
00404 Implemented using *valmon.py* with *digestpath.py*. Valmon needs to run as a daily 
00405 cronjob on the remote node. Configure with **dbsrvmon** section::
00406 
00407     % ~/.env.cnf blyth@belle1.nuu.edu.tw 
00408     [dbsrvmon]
00409     tn = channelquality_db 
00410     chdir = /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7/archive/10000 
00411     return = dict 
00412     dbpath = ~/.env/dbsrvmon.sqlite 
00413     cmd = digestpath.py 
00414     note = stores the dict returned by the command as a string in the DB without interpretation 
00415     valmon_version = 0.2 
00416     constraints = ( tarball_count >= 34, dna_mismatch == 0, age < 86400 , age < 1000, ) 
00417 
00418 Tested on belle1::
00419 
00420     [blyth@belle1 e]$ valmon.py -s dbsrvmon ls
00421     2013-06-17 11:48:01,515 env.db.valmon INFO     /home/blyth/env/bin/valmon.py -s dbsrvmon ls
00422     2013-06-17 11:48:01,520 env.db.valmon WARNING  no email section configures and no MAILTO envvar, NOTIFICATION WILL FAIL
00423     2013-06-17 11:48:01,521 env.db.valmon INFO     arg ls
00424     ('2013-06-13T19:46:01', 5.5278148651123047, "{'dna_match': 34, 'lookstamp': 1371123961.7826331, 'dna_mismatch': 0, 'tarball_count': 34, 'age': 9030.7826330661774, 'lastchange': 1371114931, 'dna_missing': 0}", 0.0, 0)
00425     ('2013-06-13T19:54:06', 5.8677470684051514, "{'dna_match': 34, 'lookstamp': 1371124446.7869501, 'dna_mismatch': 0, 'tarball_count': 34, 'age': 9515.7869501113892, 'lastchange': 1371114931, 'dna_missing': 0}", 0.0, 0)
00426 
00427 
00428 
00429 Obtain the backup tarballs
00430 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00431 
00432 As of Dec 24 2013 there are 54 tarballs of 43M each, corresponding to 2322M total. scp them using the
00433 scponly account on cms01. Qiumei/Simon can provide the password::
00434 
00435     dir=/data/var/dbbackup/dbsrv/dybdb2.ihep.ac.cn/channelquality_db_dybdb2/archive/
00436     mkdir -p $dir && cd $dir         
00437     scp -r dayabayscp@cms01.phys.ntu.edu.tw:/data/var/dbbackup/dbsrv/dybdb2.ihep.ac.cn/channelquality_db_dybdb2/archive/10000 .
00438 
00439 
00440 Partioned dump usage
00441 -----------------------
00442 
00443 Full backups are impractical for 10G tables.
00444 
00445 Partitioned dumping is attactive for backups of such large tables, 
00446 as just new partitions need to be dumped on each invokation. 
00447 
00448 For scp transfers would need to create tarfiles for each partition 
00449 with dna sidecars, and add a transfer subcommand with option controlled remote node. 
00450 Clearly via dna checking would allow only new partitions to be transfereed.
00451 
00452 
00453 
00454 System python API warning
00455 --------------------------
00456 
00457 Careful regarding PYTHONPATH, when mixing a NuWa PYTHONPATH with a system python get API RuntimeWarning::
00458 
00459     [blyth@belle7 DybPython]$ /usr/bin/python dbsrv.py -t DqChannel,DqChannelVld,DqChannelStatusVld --home dybdb2_ligs   channelquality_db_dybdb2 dumplocal /tmp/cq/channelquality_db_dybdb2  --partition --partitioncfg 10,0,1 
00460     /data1/env/local/dyb/external/mysql_python/1.2.3_mysql5.0.67_python2.7/i686-slc5-gcc41-dbg/lib/python2.7/site-packages/MySQLdb/__init__.py:19: RuntimeWarning: Python C API version mismatch for module _mysql: This Python has API version 1012, module _mysql has version 1013.
00461       import _mysql
00462     2013-06-06 18:03:08,963 __main__ INFO     schema dir /tmp/cq/channelquality_db_dybdb2/10/_ exists already
00463     2013-06-06 18:03:08,963 __main__ INFO     /* 10-partition 1  /1  */ SEQNO >= 1 and SEQNO <= 10 
00464     2013-06-06 18:03:09,165 __main__ INFO     checking prior csv dump /tmp/cq/channelquality_db_dybdb2/10/0  --partitioncfg 10,0,1 
00465     [blyth@belle7 DybPython]$ 
00466 
00467 Avoiding the NuWa PYTHONPATH means are entirely system and avoid the RuntimeWarning::
00468 
00469     [blyth@belle7 DybPython]$ PYTHONPATH=  /usr/bin/python dbsrv.py -t DqChannel,DqChannelVld,DqChannelStatusVld --home dybdb2_ligs   channelquality_db_dybdb2 dumplocal /tmp/cq/channelquality_db_dybdb2  --partition --partitioncfg 10,0,1 
00470     2013-06-06 18:04:58,078 __main__ INFO     schema dir /tmp/cq/channelquality_db_dybdb2/10/_ exists already
00471     2013-06-06 18:04:58,078 __main__ INFO     /* 10-partition 1  /1  */ SEQNO >= 1 and SEQNO <= 10 
00472     2013-06-06 18:04:58,282 __main__ INFO     checking prior csv dump /tmp/cq/channelquality_db_dybdb2/10/0  --partitioncfg 10,0,1 
00473     [blyth@belle7 DybPython]$ 
00474 
00475 
00476 Import Notes
00477 -------------
00478 
00479 #. keep to a minimum of imports for portability to server situation, ie do not rely on NuWa environment
00480 #. MySQLdb optionality is to allows non MySQL-python nodes to autodoc
00481 
00482 """
00483 import sys, os, logging, inspect, time, re, tarfile, platform, platform
00484 from datetime import datetime
00485 log = logging.getLogger(__name__)
00486 from ConfigParser import ConfigParser, NoSectionError
00487 from pprint import pformat
00488 from tar import Tar
00489 from common import sshenv
00490 
00491 
00492 
00493 def _sorted(iterable, key=lambda _:_, reverse=False):
00494     """  
00495     sorted for py23, caution returns lists not tuples
00496     """ 
00497     temp = [(key(x), x) for x in iterable]
00498     temp.sort()
00499     if reverse:
00500         return [temp[i][1] for i in xrange(len(temp) - 1, -1, -1)] 
00501     return [t[1] for t in temp]
00502 
00503 try:
00504     sorted
00505 except NameError: 
00506     sorted = _sorted 
00507 
00508 
00509 
00510 try:
00511     import MySQLdb
00512 except ImportError:
00513     MySQLdb = None
00514 
00515 try:
00516     import _mysql
00517 except ImportError:
00518     _mysql = None
00519 
00520  
00521 def attrs_(mod, names):
00522     """Access module constants is a version independant way """  
00523     if mod:
00524         return map( lambda _:getattr(mod, _), filter( lambda _:hasattr(mod,_), names.split() ) ) 
00525     else:
00526         return None 
00527 string_types = attrs_( MySQLdb.constants.FIELD_TYPE , 'VARCHAR DATETIME STRING VAR_STRING' ) 
00528 
00529 
00530 seconds = {}
00531 def timing(func):
00532     def wrapper(*arg,**kw):
00533         '''source: http://www.daniweb.com/code/snippet368.html'''
00534         t1 = time.time()
00535         res = func(*arg,**kw)
00536         t2 = time.time()
00537         global seconds
00538         seconds[func.func_name] = (t2-t1)
00539         return res 
00540     return wrapper
00541 
00542 
00543 class CSVFormat(list):
00544     """
00545     Provides the format string to create the CSV line
00546     appropriate for the field types of the low level query result description 
00547     It looks something like::
00548 
00549         %s,"%s","%s",%s,%s,%s,%s,%s,"%s","%s"
00550 
00551     Usage example::
00552 
00553         llc = _mysql.connection(...) 
00554         llc.query("select * from ...")
00555         result = llc.store_result()
00556         csvf = CSVFormat( result.describe() ) 
00557         for row in result.fetch_row(0):
00558             print str(csvf) % tuple(row)
00559 
00560     """
00561 
00562     def __str__(self):
00563         def field_fmt(fdesc):
00564             if fdesc[1] in string_types:
00565                 return "\"%s\""
00566             return "%s"
00567         return ",".join( map(field_fmt, self) )    
00568  
00569 class DB(object):
00570     def __init__(self, sect, opts=None, home=None):
00571         """
00572         :param sect: name of section in config file 
00573         :param opts: options
00574         :param home: DB instance
00575 
00576 
00577         Safety constraints on config to minimize accidents from config confusion.
00578 
00579         Initially required non-loopback section names and database names to be the same
00580 
00581         Loosening this to allow remote commands, by designating a "home" instance
00582         and requiring all other instances to match that one in everything but the 
00583         database name
00584 
00585         """
00586         pass 
00587         self.sect = sect
00588         self.opts = opts
00589         self.home = home
00590         self.backupdir = os.path.join(self.opts.backupfold, platform.node(), sect )   
00591 
00592         cnf = MyCnf("~/.my.cnf")
00593         dbc = cnf.mysqldb_pars(sect, home=home)   # NB sect is taken as the DB name if there is no such section in the config
00594 
00595         log.debug("sect %s home %s dbc %s " % (sect, home, repr(dbc)))
00596  
00597         if sect == "loopback":
00598             assert dbc['host'] in ('localhost','127.0.0.1')   # curious 127.0.0.1 not working on cms01 need localhost
00599         pass
00600         if home is None: 
00601             assert dbc['db'] in ("information_schema", "mysql")  # older mysql (eg on cms01 does not have information_schema DB ? OR its protected)
00602             assert opts.home == sect, "only the home instance is allowed an undefined home parameter " 
00603         else:
00604             log.debug("     dbc %s " % repr(dbc))
00605             log.debug("home.dbc %s " % repr(home.dbc))
00606             #qwns = "host user passwd"
00607             qwns = "host"
00608             for qwn in qwns.split():
00609                 assert dbc.get(qwn,None) == home.dbc.get(qwn,None), "non \"home\" instances are constrained to match the \"home\" in everything but the db "
00610 
00611         self.is_local = dbc['host'] == '127.0.0.1'
00612         self.dbc = dbc
00613         self.cnf = cnf
00614         log.debug("connecting to %s " % dict(dbc, passwd="***"))
00615         try:  
00616             conn = MySQLdb.connect( **dbc )   # huh, version variation in accepted params
00617         except MySQLdb.Error, e: 
00618             raise Exception("Error %d: %s " % ( e.args[0], e.args[1] ) )
00619 
00620         llconn = _mysql.connect( **dbc )       
00621 
00622         self.conn = conn
00623         self.llconn = llconn
00624         self._size = None
00625         pass
00626 
00627         partitionmgr = PartitionMgr(opts, self)
00628         self.partitionmgr = partitionmgr
00629 
00630 
00631     def dispatch(self, *args, **kwa):
00632         if self.opts.partition:
00633             pfx = "partition_"
00634         else:
00635             pfx = ""
00636 
00637         cmd = pfx + args[0] + "___"    
00638         if hasattr(self, cmd ):
00639             getattr( self , cmd)( *args[1:], **kwa )   
00640         else:
00641             raise Exception("cmd %s not implemented " % cmd)
00642 
00643     def lscmd___(self, *args, **kwa ):
00644         for m in filter(lambda _:_[-3:] == '___', dir(self)):
00645             print m[:-3]  
00646 
00647     def database_drop_create(self, dbname):
00648         """
00649         :param dbname: name of the database to be dropped and recreated 
00650         """
00651         assert self.sect == "loopback" and self.sect != dbname , ( self.sect, dbname  )
00652         if self.opts.noconfirm:
00653             log.info("proceed with DB_DROP_CREATE of %(dbname)s without confirmation" % locals() )
00654         else:
00655             ans = raw_input("DROP and reCREATE database %(dbname)s loosing all tables contained ? Enter \"YES\" to proceed : " % locals()) 
00656             if ans != "YES":
00657                 log.warn("skipping DROP CREATE ")
00658                 return
00659        
00660         self("drop database if exists %(dbname)s " % locals() ) 
00661         self("create database %(dbname)s " % locals() ) 
00662 
00663 
00664     def docs( cls ):
00665         """
00666         collect the docstrings on command methods 
00667         identified by naming convention of ending with ___ 
00668         """
00669         mdoc = lambda m:getattr(m,'__doc__',None)
00670         mdocs  = [ dict(meth=k[:-3],doc=mdoc(v)) for k,v in [(k,v) for k,v in inspect.getmembers(cls) if k[-3:]=='___' and k[0] != '_' and mdoc(v)]]
00671         return "\n".join([ """ %(meth)s : %(doc)s """ % d for d in mdocs ])   
00672     docs = classmethod(docs)
00673 
00674 
00675     def execute_(self, cmd):
00676         cursor = self.conn.cursor(MySQLdb.cursors.DictCursor)
00677         cursor.execute( cmd )
00678         return cursor
00679 
00680     def fetchall(self, cmd ):
00681         cursor = self.execute_(cmd)
00682         rows = cursor.fetchall()
00683         self.count = cursor.rowcount
00684         cursor.close()
00685         return rows
00686 
00687     def _query_size(self):
00688         sql = "select round(sum((data_length+index_length-data_free)/1024/1024),2) as TOT_MB from information_schema.tables where table_schema = '%(db)s' " % self.dbc
00689         return float(self(sql)[0]['TOT_MB'])
00690     def _get_size(self):
00691         if self._size is None:
00692              self._size = self._query_size()
00693         return self._size 
00694     size = property(_get_size, doc="Size estimate of the DB in MB ") 
00695 
00696     def _get_databases(self):
00697         """
00698         This query gives fewer results than `show databases`, which demands skips to avoid errors in getting sizes 
00699         #skip = "hello hello2 other test_noname tmp_cascade_2 tmp_dbitest tmp_tmp_offline_db_2".split()  
00700         """
00701         sql = "select distinct(table_schema) from information_schema.tables"
00702         return map(lambda _:_['table_schema'],self(sql))
00703     databases = property(_get_databases, doc="List of database names obtained from information_schema.tables") 
00704 
00705 
00706     def _get_tables_infoschema(self):
00707         """
00708         """
00709         sql = "select distinct(table_name) from information_schema.tables where table_schema='%(db)s'" % self.dbc
00710         return map(lambda _:_['table_name'],self(sql))
00711     tables = property(_get_tables_infoschema, doc="List of table names obtained from information_schema.tables") 
00712 
00713 
00714     def _get_tables(self):
00715         """
00716         Older mysql does not have information_schema
00717         """
00718         sql = "show tables" 
00719         tables = []
00720         for d in self(sql):
00721             k,v = d.items()[0]
00722             tables.append(v)
00723         return tables
00724     tables = property(_get_tables, doc="List of table names obtained from show tables") 
00725 
00726 
00727     def _get_datadir(self):
00728         return self("select @@datadir as datadir")[0]['datadir']
00729     datadir = property(_get_datadir, doc="Query DB server to find the datadir, eg /var/lib/mysql/ OR /data/mysql/ ")
00730         
00731     def __call__(self, cmd):
00732         log.debug(cmd)
00733         t0 = time.time()
00734         ret = self.fetchall(cmd)
00735         t1 = time.time()
00736         self.lastseconds = t1 - t0 
00737         return ret
00738 
00739 
00740     def lsdatabases___(self, *args, **kwa ):
00741         """
00742         list databases
00743         """ 
00744         print "\n".join(self.databases)
00745 
00746     def lstables___(self, *args, **kwa ):
00747         """
00748         list tables
00749         """
00750         print "\n".join(self.tables)
00751 
00752     def summary___(self, *args, **kwa):
00753         """
00754         Present summary of tables in rst table format:
00755 
00756         ==============================  ==========  ==============================  ==============================
00757         TABLE_NAME                      TABLE_ROWS  CREATE_TIME                     CHECK_TIME                    
00758         ==============================  ==========  ==============================  ==============================
00759         DqChannel                       62126016    2013-05-30 18:52:51             2013-05-30 18:52:51           
00760         DqChannelStatus                 62126016    2013-05-30 18:17:42             2013-05-30 18:17:42           
00761         DqChannelStatusVld              323573      2013-05-30 18:52:44             None                          
00762         DqChannelVld                    323573      2013-05-30 19:34:55             None                          
00763         LOCALSEQNO                      3           2013-05-30 19:35:02             None                          
00764         ==============================  ==========  ==============================  ==============================
00765              
00766         """ 
00767         result = self("select TABLE_NAME, TABLE_ROWS, CREATE_TIME, UPDATE_TIME, CHECK_TIME  from information_schema.tables where table_schema = '%(db)s' " % self.dbc)
00768         kws = (('TABLE_NAME', 30), ('TABLE_ROWS', 10), ('CREATE_TIME', 30 ), ('CHECK_TIME',30),)
00769         print "\n".join(["",self.sect,self._rst_table( result, kws )])
00770    
00771     
00772     def _rst_table(self, result, kws, char="="):
00773         """
00774         :param result: interable providing result dicts 
00775         :param kws: sequence of 2-tuples providing result dict keys and presentation widths 
00776         :param char: 
00777         :return: multi line string rst table presentation 
00778         """ 
00779         fcol_ = lambda kw:"%s(%s)-%s%s" % ("%",kw[0],kw[1],"s")
00780         fmt = "  ".join(map(fcol_,kws) ) 
00781 
00782         # spell these out for ancient python 
00783         #mkr = fmt % dict((kw[0],char * kw[1]) for kw in kws)
00784         #lbl = fmt % dict((kw[0],kw[0]) for kw in kws)
00785         dmkr = {}
00786         dlbl = {}
00787         for kw in kws:
00788             dmkr[kw[0]] = char * kw[1] 
00789             dlbl[kw[0]] = kw[0] 
00790         mkr = fmt % dmkr
00791         lbl = fmt % dlbl
00792         bdy = [fmt % d for d in result]
00793         return "\n".join([mkr, lbl, mkr] + bdy + [mkr])
00794 
00795 
00796     def _get_utables(self):
00797         """
00798         """ 
00799         alltables = self._get_tables()
00800         if self.opts.tables is None:
00801              return alltables
00802         else:
00803              return filter(lambda t:t in alltables, self.opts.tables.split(","))
00804     utables = property(_get_utables, doc="List of tables to use in operations, when --tables option is used this can be a subset of all tables.")
00805 
00806     def show_create( self, table):
00807         return self("show create table %(table)s" % locals())[0]['Create Table']
00808 
00809     def _dumplocal_mkdir(self, outdir ):
00810         """
00811         :param outdir:
00812         """
00813         if not os.path.exists(outdir):
00814             os.makedirs(outdir)     # umask gets in the way of the mode argument here 
00815             os.chmod(outdir, 0777)  # needs to be writable by other so mysql user can write into it 
00816         assert os.path.isdir(outdir), outdir
00817         pass
00818     def _dumplocal_schema(self, outdir, table ):
00819         """
00820         :param outdir:
00821         :param table:
00822         """
00823         schema=os.path.join(outdir, "%s.schema" % table )
00824         sf = open(schema, "w")
00825         sf.write( self.show_create(table) )
00826         sf.close()
00827         pass
00828     def _dumplocal_table(self, outdir, table, where ):
00829         """
00830         :param outdir:
00831         :param table:
00832         :param where: 
00833         """
00834         outfile=os.path.join(outdir, "%s.csv" % table )
00835         log.info("_dumplocal_table %s writing %s " % (where,outfile) ) 
00836         pass
00837 
00838         if table == 'LOCALSEQNO':
00839             where = "TRUE"
00840 
00841         if self.is_local:
00842             log.debug("IntoOutFile")
00843             if os.path.exists(outfile):
00844                 log.info("remove preexisting outfile %s " % outfile )
00845                 os.remove(outfile) 
00846             io = IntoOutfile(table=table,where=where, outfile=outfile)
00847             self(str(io))
00848         else:
00849             log.debug("RemoteIntoOutFile")
00850             t0 = time.time()
00851             tf = open(outfile,"w")
00852             rio = RemoteIntoOutfile(table=table, where=where)
00853             self._write_csvdirect(rio, tf )   ## result of select is returned to python and thence formatted directly into csv, works remotely 
00854             tf.close()
00855             t1 = time.time()
00856             self.lastseconds = t1 - t0
00857         pass
00858         log.info("partition_dumplocal___ %s writing %s took %s seconds " % (where,outfile, "%5.2f" % self.lastseconds ) ) 
00859         pass
00860 
00861     def _write_csvdirect(self, select , tf ):
00862         """
00863         Adopt low level approach to avoid unnecessary conversions into 
00864         python types then back to string and the associated difficulties of 
00865         then getting precisely the same as SELECT * INTO OUTFILE 
00866 
00867         Note that use of `store_result` rather than `use_result` means 
00868         that all rows are in memory at once.
00869 
00870         NB for consistency the CSV ouput by this command MUST MATCH that 
00871         by _write_outfile
00872 
00873         `_write_csvdirect` is used by **rdumpcat** , this mimics 
00874         the output from `_write_outfile` (used by **dumpcat**) with 
00875         the big advantage that it works remotely, with no strong permission 
00876         requirements
00877 
00878         TODO:
00879 
00880         #. when there is a pre-existing LOCALSEQNO redirect LOCALSEQNO to a temporay file 
00881            and do a merge...  easiest to instanciate them as AsciiCSV and then merge at that level 
00882 
00883         """
00884         q = str(select)
00885         log.debug("_write_csvdirect %s " % q) 
00886 
00887         llconn = self.llconn   
00888         llconn.query( q )
00889 
00890         lessmemory = True
00891         if lessmemory:
00892             log.debug("using `--LESSMEMORY` option : less memory expensive but more network expensive 'use_result'  ")
00893             result = llconn.use_result()
00894         else:
00895             log.debug("using more memory expensive but less network expensive 'store_result' ")
00896             result = llconn.store_result()
00897 
00898         csvf = CSVFormat( result.describe() )   
00899         for row in result.fetch_row(maxrows=0, how=0):   ## all rows as tuples
00900             tf.write( str(csvf) % tuple(row) +"\n" )
00901 
00902 
00903     def timestamped_dir(self, *args):
00904         """
00905         Timestamping is needed for non-partitioned case 
00906         """
00907         bdir = self.determine_basedir(*args)
00908         odir = os.path.join(bdir, self.opts.timestamp )
00909         return odir
00910 
00911     def dumplocal___(self, *args, **kwa ):
00912         """
00913         :param outdir:  specifies output directory which must be writable by mysql user, it will be created if not existing 
00914 
00915         Rerunning this will do quick checks of  the CSV files, looking at line counts 
00916         and the first and last line and comparing with expections from DB queries. The quick
00917         checks are done via commands:
00918 
00919         * `wc` 
00920         * `head -1` 
00921         * `tail -1`
00922 
00923         This is not called in the partitioned case.
00924         """
00925         where = kwa.pop('where',None)
00926         if where is None:
00927             where = self.opts.where
00928 
00929         tables = kwa.pop('tables',None)
00930         if tables is None:
00931             tables = self.utables
00932 
00933         odir = self.timestamped_dir(*args)
00934         if os.path.exists(odir):
00935             log.info("dumplocal_ timestamped dir %s exists already, skipping dump" % odir ) 
00936         else:
00937             log.info("dumplocal_ into timestamped dir %s " % odir ) 
00938             self._dumplocal_mkdir(odir)
00939             for table in tables:
00940                 self._dumplocal_schema(odir, table)
00941                 self._dumplocal_table(odir, table, where)
00942             pass
00943         pass
00944         if self.opts.archive or self.opts.archiveforce:
00945             self.archive(odir)
00946 
00947 
00948     def fields(self, table, filter_=None):
00949         return filter(filter_,map(lambda _:_['Field'],self("describe %(table)s" % locals())))
00950 
00951     def range(self, table, where="1=1", keycount=True):
00952         sql = self.partitionmgr.qrange(table, where, keycount=keycount)
00953         return self(sql)[0]
00954 
00955     def minmax(self, table, where="1=1"):
00956         sql = self.partitionmgr.qminmax(table, where)
00957         return self(sql)[0]
00958 
00959     def ptables(self):
00960         """
00961         :return: list of tables with the key field
00962         """
00963         key = self.partitionmgr.key 
00964         return filter(lambda t:key in self.fields(t), self.utables)
00965 
00966 
00967     def tabminmax_csv(self, path):
00968         kpo = self.partitionmgr.kpo
00969         head = os.popen("head -1 %(path)s" % locals()).read().strip()
00970         tail = os.popen("tail -1 %(path)s" % locals()).read().strip()
00971         table = os.path.basename(path[:-4]) 
00972         minkey = int(head.split(",")[kpo])
00973         maxkey = int(tail.split(",")[kpo])
00974         return table, minkey, maxkey 
00975 
00976 
00977     def _wc_csv(self, outdir, keycount=False): 
00978         """
00979         :param outdir: partition directory 
00980         :return: dict of keyed by table name providing csv info, line count,  
00981 
00982         ::
00983 
00984             {'count': 10, 'keycount': 10, 'max': 40, 'min': 31}
00985 
00986 
00987         #. getting the keycount in a general manner, 
00988            ie number of distinct key values would requiring 
00989            parsing the entire CSV so fake it from head and tail
00990 
00991         ::
00992 
00993             [blyth@belle7 ~]$ wc -l /tmp/pp/tmp_ligs_offline_db_0/1000/1/*.csv
00994             192000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannel.csv
00995             192000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannelStatus.csv
00996               1000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannelStatusVld.csv
00997               1000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannelVld.csv
00998             386000 total
00999         """
01000         if not os.path.exists(outdir):
01001             return None
01002 
01003         pipe = os.popen("wc -l %(outdir)s/*.csv" % locals())
01004         lines = pipe.readlines()
01005         rc = pipe.close()
01006         if rc is None:
01007             rc = 0 
01008         rc = os.WEXITSTATUS(rc) 
01009         log.debug("wc return code %(rc)s for %(outdir)s " % locals() ) 
01010 
01011         if rc != 0:
01012             log.warn("Problem with csv files in %(outdir)s wc rc %(rc)s " % locals() ) 
01013             return None
01014 
01015         wc = {}
01016         for n, path in map(lambda _:_.lstrip().rstrip().split(), lines):
01017             if path[-4:] != '.csv':continue
01018             pass
01019             table, minkey, maxkey = self.tabminmax_csv(path)
01020             wc[table] = dict(count=int(n),min=minkey,max=maxkey)
01021             if keycount:
01022                 wc[table]['keycount'] = maxkey - minkey + 1      # this is quick and dirty 
01023         return wc
01024 
01025     def determine_basedir(self, *args):
01026         """
01027         """
01028         if len(args)>0:
01029             basedir = args[0]
01030         else:
01031             basedir = self.backupdir  # eg /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7
01032         pass
01033         if basedir != self.backupdir:
01034             log.warn("using basedir %s different from standard %s " % (basedir,self.backupdir) ) 
01035         else:
01036             log.debug("using basedir %s " % basedir ) 
01037 
01038         return basedir
01039 
01040 
01041     def assert_valid_dump(self, pdir, csv, chk ):
01042          assert csv == chk , ("prior dump %s check fail" % pdir, pformat(csv),pformat(chk))
01043          assert sorted(csv.keys()) == sorted(chk.keys()), ("tables mismatch", sorted(csv.keys()),  sorted(chk.keys()))
01044          for table in csv.keys():
01045              if chk[table].has_key('keycount'):
01046                  assert chk[table]['keycount'] == csv[table]['keycount'], (table, chk[table]['keycount'], csv[table]['keycount'] )
01047              assert chk[table]['count'] == csv[table]['count'], (table, chk[table]['count'], csv[table]['count'] )
01048              assert chk[table]['min'] == csv[table]['min'], (table, chk[table]['min'], csv[table]['min'] )
01049              assert chk[table]['max'] == csv[table]['max'], (table, chk[table]['min'], csv[table]['max'] )
01050 
01051 
01052     def partition_dumpcheck(self, pdir, pwhere, is_last, keycount=False ):
01053         """
01054          Checks a partition dump returning flag to signal a dump or not.
01055 
01056         :param pdir:
01057         :param pwhere:
01058         :param is_last:
01059         :param keycount: doing distinct keycount is quite slow, so can skip for pre-existing
01060         :return: pdump, chk
01061         """
01062         log.info("_")
01063         log.info("partition_dumpcheck partition loop %s " % pwhere)
01064         if is_last:
01065             assert pdir[-5:] == "/last"  , "unexpected pdir %s for last partition " % (pdir) 
01066 
01067         ptables = self.ptables()
01068         pdump = None 
01069 
01070         # DB look at SEQNO ranges
01071         chk = {}
01072         for table in ptables:
01073             chk[table] = self.range(table, pwhere, keycount=keycount)  
01074 
01075         # file system look at csv files 
01076         log.info("checking prior csv dump %s  --partitionsize %s --partitionrange %s  " % (pdir,self.opts.partitionsize, self.opts.partitionrange))
01077         csv = self._wc_csv(pdir, keycount=keycount)
01078         if csv is None:
01079             if os.path.exists(pdir):
01080                 msg = "Partition directory \"%(pdir)s\" exists but it contains no .csv files, delete the empty directory and rerun to clear this error" % locals()
01081                 log.fatal(msg)
01082                 raise Exception(msg) 
01083 
01084         # compare DB expectations to CSV dumps 
01085         if csv == chk:
01086             log.info("partition dump %s unchanged wrt DB check " % pdir )
01087             pdump = False
01088         else:
01089             pdump = True
01090             log.info("chk %s " % repr(chk))
01091             log.info("csv %s " % repr(csv))
01092             if is_last:
01093                 log.info("last partition dump %s and is changed wrt DB : will redump " % pdir )
01094             elif csv is None:
01095                 log.info("non last partition dump %s csv is None : first dump " % pdir )
01096             else:
01097                 log.fatal("non last partition dump %s and is changed wrt DB : will fail " % pdir )
01098                 self.assert_valid_dump( pdir, csv, chk)
01099             pass
01100         return pdump, chk 
01101 
01102  
01103     def partition_dumplocal___(self, *args, **kwa):
01104         """
01105         """
01106         pm = self.partitionmgr 
01107         pm.basedir = self.determine_basedir(*args)
01108         if not os.path.exists(pm.basedir):
01109             os.makedirs(pm.basedir)   
01110 
01111         svy = pm.survey(self)
01112         ptables = self.ptables()
01113         assert len(ptables), ptables
01114 
01115         # write all table schema into single schema dir rather than repeating for every partition
01116         pdir = pm.dir("_")
01117         if os.path.exists(pdir):
01118             log.info("schema dir %s exists already" % pdir )
01119         else:
01120             log.info("creating and populating schema dir %s " % pdir )
01121             self._dumplocal_mkdir(pdir)
01122             for table in ptables:
01123                 self._dumplocal_schema(pdir, table)
01124             pass
01125         pass
01126         if self.opts.archive or self.opts.archiveforce:
01127             self.archive(pdir)
01128  
01129         for p in pm.parts:
01130             pdir = pm.dir(p)
01131             pmin = pm.min(p)     
01132             pmax = pm.max(p)
01133             pwhere = pm.where(p)
01134             is_last = pm.is_last(p)   # last partition expected to be incomplete
01135             keycount = not os.path.exists(pdir)     # only keycount for new partitions
01136             pdump, chk = self.partition_dumpcheck( pdir, pwhere , is_last, keycount=keycount )
01137             if pdump:
01138                 log.info("dumplocal partition %s %s %s:%s --partitionsize %s --partitionrange %s " % (p,self.opts.partitionkey,pmin,pmax,self.opts.partitionsize, self.opts.partitionrange)) 
01139                 self._dumplocal_mkdir(pdir)
01140                 for table in ptables:
01141                     if is_last:
01142                         log.warn("skipping completeness checks for last partition %s into %s " % (p, pdir))
01143                         self._dumplocal_table(pdir, table, pwhere)
01144                     elif chk[table]['min'] == pmin  and chk[table]['max'] == pmax:
01145                         self._dumplocal_table(pdir, table, pwhere)
01146                     else:
01147                         log.warn(" table %s check min  %s pmin %s " % ( table, chk[table]['min'], pmin ))
01148                         log.warn(" table %s check max  %s pmax %s " % ( table, chk[table]['max'], pmax ))
01149                         log.warn("skipping dump as check ahead query indicates incomplete partition %s " % repr(check)) 
01150                     pass
01151                 zdump, zchk  = self.partition_dumpcheck( pdir, pwhere, is_last, keycount=True )
01152                 assert zdump == False, ("post dump dumpcheck signals need to dump", pformat(zchk)) 
01153             pass
01154             if self.opts.archive: 
01155                 force = self.opts.archiveforce or pdump
01156                 self.archive(pdir, force=force)
01157         pass
01158 
01159     def extract(self, dir, base):
01160         """
01161         :param dir: directory to be created by extraction 
01162         :param base: 
01163         """
01164         pass
01165         tgzp, rpath = self.archivepath(dir, base) 
01166         log.info("extract dir %s tgzp %s rpath %s " % (dir, tgzp, rpath))
01167         tgz = Tar(tgzp, toplevelname=rpath )
01168         tgz.examine()
01169         ls = tgz.list(tgz.members)
01170         tgz.digest(verify=True)
01171         tgz.extract( containerdir=base , toplevelname=rpath, dryrun=False )
01172 
01173 
01174     def archivepath(self, dir , base=None):
01175         """
01176         :param dir: directory to be archived or extracted into 
01177         :return: path to archive tarball, dir path relative to base
01178         """ 
01179         if base is None:
01180             base = self.backupdir  # eg /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7
01181         assert dir[0:len(base)] == base, (dir, base, "dir %s should be within base %s " % (dir,base))
01182         rpath = dir[len(base)+1:]
01183         elems = rpath.split("/")
01184         name = "_".join(elems)
01185         top = elems[0]         # typically this is chunk size
01186         tgzp = os.path.join(base,"archive", top, "%s.tar.gz" % name )   
01187         log.debug("archivepath,  dir %s base %s rpath %s name %s tgzp %s  " % ( dir, base, rpath, name, tgzp ) )
01188         return tgzp, rpath 
01189 
01190 
01191     def archive(self, dir , force=False):
01192         """
01193         :param dir: directory the contents of which should be archived 
01194 
01195         As a partition corresponds to a certain SEQNO range, it never changes
01196         so there is no need for a datestring in the path.
01197 
01198         The configured backupfold needs to be created before using the archive `-a` option with::
01199 
01200             [blyth@belle7 DybPython]$ sudo mkdir /var/dbbackup/dbsrv
01201             [blyth@belle7 DybPython]$ sudo chown -R blyth.blyth /var/dbbackup/dbsrv/
01202 
01203         """
01204         base = self.backupdir  # eg /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7
01205         tgzp, rpath = self.archivepath(dir) 
01206         tgzd = os.path.dirname(tgzp)
01207         if not os.path.exists(tgzd):
01208             os.makedirs(tgzd)
01209         pass
01210 
01211         if self.opts.transfer:
01212             cfg = self.opts.transfercfg.split(":")
01213             if len(cfg) == 2:
01214                 kwa = dict(remotenode=cfg[0], remoteprefix=cfg[1])
01215             elif len(cfg) == 1:
01216                 kwa = dict(remotenode=cfg[0])
01217             else:
01218                 assert 0, "unexpected transfercfg %s " % repr(cfg)
01219             log.debug("using transfercfg : %s " % repr(kwa))
01220         else:
01221             kwa = {} 
01222        
01223         tgz = Tar(tgzp, toplevelname=rpath, **kwa)
01224         if os.path.exists(tgzp) and not force:
01225             log.info("archive already exists %s rerun with `-A/--archiveforce` option to recreate " % tgzp )
01226         else:
01227             log.info("creating archive %s for %s " % (tgzp,rpath) )
01228             tgz.archive( base )  # argument specifies the root directory of the archive
01229         pass
01230         tgz.examine()
01231         #log.info("\n".join(tgz.names))
01232         ls = tgz.list(tgz.members)
01233         log.debug("\n"+ls)
01234         du = os.popen("du -hs %(tgzp)s" % locals()).read().strip()
01235         log.info(du)
01236         tgz.digest()
01237 
01238         if self.opts.transfer:
01239             tgz.transfer()
01240 
01241 
01242     def partition_loadlocal___(self, *args, **kwa ):
01243         """
01244         #. look into putting the partitions back togther again, in partitioned load local
01245         #. read file system tealeaves wrt the partitioning
01246 
01247         #. factor off the checking 
01248         #. need to work out which partitions are new and just load those         
01249         """
01250         basedir = self.determine_basedir(*args)
01251         log.info("basedir %s " % basedir )
01252         pm = self.partitionmgr 
01253         pm.basedir = basedir
01254 
01255         if self.opts.extract:
01256             chunks = pm.archived_chunks()
01257             log.info("archived_chunks %s " % repr(chunks) )
01258             pm.assign_parts( chunks )
01259         else:
01260             chunks = pm.available_chunks()  # only works after extraction
01261             log.info("available_chunks %s " % repr(chunks) )
01262 
01263         _dir = pm.dir("_")
01264         if not os.path.exists(_dir):
01265             log.info("schema dir %s does not exists" % _dir )
01266             if self.opts.extract:
01267                 self.extract(_dir, basedir)
01268             pass
01269         pass
01270         assert os.path.exists(_dir), _dir 
01271         #_tables = map(lambda _:_[:-7],filter(lambda _:_[-7:] == '.schema',os.listdir(_dir)))
01272 
01273         for p in pm.parts:
01274             pdir = pm.dir(p)
01275             pnam = os.path.basename(pdir)
01276             pmin = pm.min(p) 
01277             pmax = pm.max(p) 
01278             pwhere = pm.where(p) 
01279             #log.debug("p %s pdir %s pmin %s pmax %s pwhere %s " % (p,pdir,pmin,pmax,pwhere))
01280             if not os.path.exists(pdir):
01281                 log.debug("partition_loadlocal___ partition dir %s does not exist " % pdir )
01282                 if self.opts.extract:
01283                     self.extract(pdir, basedir)
01284 
01285             if not os.path.exists(pdir):
01286                 log.debug("partition_loadlocal___ partition dir %s STILL does not exist " % pdir ) 
01287             else:
01288                 log.info("partition_loadlocal___ loading %s " % pdir ) 
01289                 ltables = self.loadlocal_dir( pdir )
01290                 log.debug("ltables %s " % str(ltables))
01291 
01292                 # check that the loaded partition yields the expected key range and count
01293                 if not self.opts.nocheck:
01294                     check = {}
01295                     for table in ltables:
01296                         check[table] = self.minmax(table, pwhere)
01297                         assert check[table]['min'] == pmin, (table,"min",check[table]['min'],pmin)
01298                         if pnam != 'last':
01299                             assert check[table]['max'] == pmax, (table,"max",check[table]['max'],pmax)
01300                         # keycount check is too slow
01301                         #assert check[table]['keycount'] == pm.size, (table,"keycount",check[table]['keycount'],pm.size)
01302                         pass 
01303                     log.debug(pformat(check))
01304                 pass
01305 
01306 
01307     def loadlocal_dir(self, dir ):
01308         """
01309         """
01310         # if a sibling dir named "_" exists use that as a source of schema files, otherwise get directly
01311         _dir = os.path.join(os.path.dirname(dir),"_")
01312         if not os.path.exists(_dir):
01313             _dir = dir 
01314 
01315         def _replace_ignore(table):
01316             """
01317             hmm DBI specificity slipping in
01318             """
01319             if table == "LOCALSEQNO": 
01320                 return "REPLACE" 
01321             else:
01322                 return "IGNORE"    
01323 
01324         utables = self.utables
01325 
01326         log.info("utables %s " % repr(utables))
01327 
01328         ltables = [] 
01329         for name in filter(lambda _:os.path.isfile(os.path.join(dir,_)) and _[-4:] == '.csv' , os.listdir(dir)):
01330             path = os.path.join(dir, name)
01331             table, ext = os.path.splitext(name)
01332             if not table in utables:
01333                 schema = os.path.join(_dir, "%s.schema" % table )
01334                 log.warn("creating table %s from schema file %s " % (table, schema))
01335                 assert os.path.exists(schema), schema
01336                 self(open(schema,"r").read())
01337             pass
01338             ltables.append(table)
01339 
01340             ttable, csvminkey, csvmaxkey = self.tabminmax_csv(path)
01341             assert csvminkey < csvmaxkey , (csvminkey,csvmaxkey)
01342             assert ttable == table, (ttable, table)
01343             mm = self.minmax(table)            
01344             log.debug("csvminkey %s csvmaxkey %s " % (csvminkey, csvmaxkey))
01345             log.debug(" mmmin %(min)s  mmmax %(max)s " % mm )
01346             if csvmaxkey <= mm['max']:
01347                 log.info("SKIP: as already loaded csv keys minmax %s %s from %s " % (csvminkey,csvmaxkey,path ))
01348             else:
01349                 ll = LoadDataLocalInfile(infile=path, table=table, ignorelines=0, replace_ignore=_replace_ignore(table) )
01350                 self(str(ll))        
01351                 log.info("loadlocal ingesting %s took %s seconds " % (path, "%5.2f" % self.lastseconds ) ) 
01352         pass 
01353         return ltables
01354  
01355 
01356     def loadlocal___(self, *args, **kwa ):
01357         """
01358         :param outdir:  specifies directory containing normal or partitioned dump of CSV files
01359         """
01360         odir = self.timestamped_dir(*args)
01361         return self.loadlocal_dir(odir)
01362   
01363 
01364 
01365 class tdict(dict):
01366     __str__ = lambda self:self.tmpl % self
01367 
01368 class IntoOutfile(tdict):
01369     tmpl = "select * from %(table)s where %(where)s into outfile '%(outfile)s' fields terminated by ',' optionally enclosed by '\"' "
01370 
01371 class RemoteIntoOutfile(tdict):
01372     tmpl = "select * from %(table)s where %(where)s "
01373 
01374 class LoadDataLocalInfile(tdict):
01375     tmpl = "LOAD DATA LOCAL INFILE '%(infile)s' %(replace_ignore)s INTO TABLE %(table)s FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' IGNORE %(ignorelines)s LINES " 
01376 
01377 
01378 
01379 class PartitionMgr(object):
01380     int_ptn=re.compile("^\d*$")
01381     def __init__(self, opts, basedir=None):
01382         """
01383         #. for example 1000,0,3 for three chunks of 1000 starting at 1 
01384         """
01385         self.basedir = basedir  
01386         pass
01387         size = int(opts.partitionsize)  
01388         pass
01389         if opts.partitionrange is None:
01390             parts = None
01391         else:
01392             a, n = map(int,opts.partitionrange.split(","))  
01393             parts = range(a,n)
01394         pass  
01395         key, kpo = opts.partitionkey.split(",")
01396         and_where = opts.where  
01397         pass
01398         self.and_where = and_where
01399 
01400         if opts.partitionlast is None:
01401             plast = None
01402         else:             
01403             plast = int(opts.partitionlast) 
01404         pass
01405         self.plast = plast
01406         pass
01407         self.key = key       # key string eg SEQNO
01408         self.kpo = int(kpo)  # key position in csv
01409         pass
01410         self.size = size
01411         self.parts = parts
01412 
01413 
01414     def survey(self, db ):
01415         """
01416         Need to identify the last partition after which not 
01417         all of the tables will have full partitions.
01418 
01419         For sensible partitioning should restrict to tables 
01420         whose SEQNO stride along roughly together. 
01421 
01422         """
01423         size = self.size
01424         log.info("PartitionMgr survey size %s " % size  )
01425         svy = {}
01426         mlp = 0
01427         
01428         for table in db.ptables():
01429             mm = db(self.qminmax(table,where="TRUE"))[0]
01430             last = mm['max']/size
01431             svy[table] = dict(mm, last=last )
01432         pass
01433         svy['COMMON'] = dict(
01434             min=max(map(lambda _:_['min'],svy.values())),  
01435             max=min(map(lambda _:_['max'],svy.values())),
01436             last=min(map(lambda _:_['last'],svy.values()))
01437           )  
01438 
01439         log.info("\n"+pformat(svy)) 
01440         autoplast = svy['COMMON']['last']
01441         log.info("auto survey determines last partition index as %s " % autoplast )
01442         if self.plast is None:
01443             self.plast = autoplast  
01444         else:
01445             if self.plast != autoplast:
01446                 log.warn("`--partitionlast` option gives %s BUT auto-survey gives %s " % (self.plast, autoplast))  
01447             pass
01448         pass
01449         if self.parts is None:
01450             parts = range(0,self.plast+1)  # plast is the 0-based index of the last partition, hence the need for  +1
01451             log.info("auto definition of partition range %s " % repr(parts))
01452             self.parts = parts 
01453         else:
01454             log.info("using partition range from options %s " % repr(self.parts))
01455         return svy
01456  
01457     def assign_parts( self, chunks ):
01458         """
01459         :param chunks:
01460         """
01461         parts = sorted(chunks)
01462         plast = parts[-1] + 1
01463         parts += [plast]
01464 
01465         self.parts = parts
01466         self.plast = plast
01467         log.info("assign_parts %s plast %s " % ( self.parts, self.plast ))
01468 
01469     def dir(self, p): 
01470         """
01471         """ 
01472         if self.is_last(p):
01473             leaf = "last"
01474         else:
01475             leaf = p
01476         pass
01477         return os.path.join(self.basedir,str(self.size),str(leaf)) 
01478 
01479     def is_last(self, p):
01480         return p == self.plast
01481 
01482     def qminmax(self, table, where="1=1"):
01483         key = self.key
01484         return "select max(%(key)s) as max, min(%(key)s) as min, count(*) as count from %(table)s where %(where)s " % locals()
01485 
01486     def qrange(self, table, where="1=1", keycount=True):
01487         key = self.key
01488         if keycount:
01489             keycount = ", count(distinct(%(key)s)) as keycount " % locals()
01490         else:
01491             keycount = ""
01492         pass
01493         return "select max(%(key)s) as max, min(%(key)s) as min, count(*) as count  %(keycount)s from %(table)s where %(where)s " % locals()
01494 
01495     def offset(self, p): 
01496         return self.size*p 
01497     def min(self, p): 
01498         return self.size*p + 1
01499     def max(self, p): 
01500         return self.size*(p+1)
01501     def where(self, p):
01502         n = len(self.parts) 
01503         size = self.size 
01504         pmin = self.min(p) 
01505         pmax = self.max(p) 
01506         pkey = self.key
01507         p1 = p + 1
01508         if self.and_where is None:
01509             and_where = ""
01510         else:
01511             and_where = " and %s " % self.and_where 
01512 
01513         return "/* [%(p)s] %(size)s-partition %(p1)-3s/%(n)s  */ %(pkey)s >= %(pmin)s and %(pkey)s <= %(pmax)s %(and_where)s " % locals()
01514 
01515     def int_dirname(self, dir):
01516         """
01517         :param dir:
01518         :return: list of integers corresponding to subfolder of *dir* with integer names, in ascending order 
01519         """ 
01520         return sorted(map(int,filter(lambda _:self.int_ptn.match(_), os.listdir(dir))))
01521 
01522     def has_partitionset(self):
01523         """
01524         Checks if there is a partitionset folder for the configured partition size.
01525         ie looks for the 10000 folder, in default case.
01526         """
01527         psizes = self.int_dirname(self.basedir)
01528         if not self.size in psizes:
01529             msg = "partition dump at %s does not have partitions of the configured size %s : %s " % (self.basedir, self.size, psizes)
01530             log.warn(msg)
01531             return False 
01532         return True 
01533 
01534     def archived_chunks(self):
01535         """
01536         :return: list of integers in ascending order corresponding to partition archives found
01537         """
01538         zdir  = os.path.join(self.basedir, "archive", str(self.size))
01539         ptn = re.compile("^(?P<size>\d*)_(?P<part>\d*).tar.gz.dna$")
01540         chunks = []
01541         for path in os.listdir(zdir):
01542             name = os.path.basename(path)
01543             match = ptn.match(name)
01544             if match:
01545                 d = match.groupdict()
01546                 chunks.append(int(d['part']))
01547             pass 
01548         return sorted(chunks)
01549 
01550     def available_chunks(self): 
01551         """
01552         :return: list of intergers in ascending order corresponding to the partition folders found
01553 
01554         Looks for the partition folder, named according to the partition size eg "basedir/10000" 
01555         and determines the chunks by looking at the integer named partition subfolders "basedir/10000/0" 
01556         """
01557         if not self.has_partitionset():
01558             log.warn("no partitionset for partition size %s" % self.size)
01559             return []  
01560         pfold = os.path.join( self.basedir, str(self.size) ) 
01561         chunks = self.int_dirname(pfold)
01562         log.info("partition folder %s for configured size %s has chunks %s " % (pfold, self.size, repr(chunks)) )
01563         return chunks
01564 
01565 
01566 class MyCnf(dict):
01567     def __init__(self, path = "~/.my.cnf", prime={}): 
01568         """
01569         :param path: to config file
01570         :param prime: initial dict to prime the config, eg for current date
01571         """
01572         cfp = ConfigParser(prime)
01573         paths = cfp.read( [os.path.expandvars(os.path.expanduser(p)) for p in path.split(":")] )
01574         log.debug("MyCnf read %s " % repr(paths) )
01575         self.cfp = cfp 
01576         self.sections = cfp.sections() 
01577         self.path = path
01578         self.paths  = paths
01579 
01580     def section(self, sect, home=None):
01581         """
01582         :param sect: name of section in config file
01583         :return: dict of config parameters from specified section of config file
01584 
01585         When the section is present in the config file simply return
01586         the parameters. If the section is not present in the file then 
01587         adopt the \"home\" instanance section and swap in the the database
01588         from the sect name.
01589 
01590         This allows access to all databases on a server without having 
01591         corresponding config file sections for all of them.
01592         """
01593         if sect in self.sections:
01594             it = dict(self.cfp.items(sect))
01595         else:
01596             if not home is None:
01597                 assert home.sect in self.sections, self.sections
01598                 it = dict(self.cfp.items(home.sect))
01599                 it['database'] = sect
01600                 log.debug("no section %s infer config from home.sect %s section assuming section name = dbname " % (sect, home.sect) )
01601             else:
01602                 it = None
01603         pass
01604         return it
01605 
01606     def mysqldb_pars(self, sect, home=None):
01607         """
01608         :param sect: name of section in config file
01609         :param home: DB instance 
01610         :return: dict of mysql-python style connection parameters
01611 
01612         Annoyingly mysql-python needs these keys
01613 
01614         `host` host to connect
01615         `user` user to connect as
01616         `passwd` password to use
01617         `db` database to use
01618 
01619         whereas mysql uses slightly different ones
01620 
01621         `host`   
01622         `user`
01623         `password`
01624         `database` 
01625 
01626         Normally can avoid this annoyance using::
01627 
01628             conn = MySQLdb.connect( read_default_group=sect )   
01629 
01630         but when need to impinge `database/db` settings this is not possible.
01631         """
01632         my2mp = dict(host="host",user="user",password="passwd",database="db", socket="unix_socket")
01633         my = self.section(sect, home=home)
01634 
01635         if my is None:
01636            msg = "missing required section \"%s\" in config file \"%s\" " % ( sect, self.path )
01637            log.fatal(msg)
01638            raise Exception(msg)
01639 
01640         mp = {}
01641         for k in filter(lambda k:k in my2mp,my.keys()):  # key translation, mysql to mysql-python
01642             mp[my2mp[k]] =  my[k]
01643         log.debug("translate mysql config %s into mysql-python config %s " % ( dict(my,password="***") , dict(mp,passwd="***") ))
01644         return mp 
01645 
01646 
01647 
01648 
01649 
01650 def main():
01651     """
01652     Need to support logging on py2.3
01653 
01654     http://docs.python.org/release/2.3.5/lib/module-logging.html
01655     """
01656     from optparse import OptionParser
01657     op = OptionParser(usage=__doc__ + "\n" + DB.docs() )
01658     now = datetime.now().strftime("%Y%m%d_%H%M")
01659 
01660     tables = "DqChannel,DqChannelVld,DqChannelStatus,DqChannelStatusVld"   # default tables to backup/propagate
01661 
01662     op.add_option("-l", "--loglevel", default="INFO", help="Choose logging level case insensitively eg INFO,DEBUG,WARN. Default %default " )
01663     op.add_option(      "--logformat", default="%(asctime)s %(name)s %(levelname)-8s %(message)s" , help="Logging format. Default %default " )
01664     op.add_option("-t", "--tables",   default=tables,   help="Comma delimited list of table names to be included in operations. Default %default " )
01665     op.add_option(      "--DB_DROP_CREATE", action="store_true",   help="CAUTION: DROPs and CREATES the database specified in the first argument. Default %default " )
01666     op.add_option("-C", "--noconfirm", action="store_true",   help="Skip interactive confirmation of dangerous operations. Default %default " )
01667     op.add_option("-K", "--nocheck", action="store_true",   help="Skip some slow checking. Default %default " )
01668     op.add_option("-w", "--where",   default=None,   help="Where clause applied to selects on all used tables. Default %default " )
01669     op.add_option(      "--partition",    action="store_true",     help="Use partitioned dumping OR loading, used as an optimization for dealing with very large tables. Default %default " )
01670     op.add_option(      "--partitionrange",  default=None,    help="Partition range in partition indices corresponding to python `range(a,b)` or None for all. Default %default " )
01671     op.add_option(      "--partitionsize",   default="10000",   help="Partition size. Default %default " )
01672     op.add_option(      "--partitionkey",   default="SEQNO,0",     help="Fieldname, field position in CSV on which partition chunking is based. Default %default " )
01673     op.add_option(      "--partitionlast",   default=None,         help="(DEPRECATED now determined automatically) Used to identify the index of the last incomplete partition, in order to skip completeness checks and save into a \"last\" directory. Default %default " )
01674     op.add_option(      "--timestamp",      default=now,  help="Non-partitioned dumps and archives are placed inside date stamped folders, to use a prior one the stamp must be specifed. Default %default " )
01675     op.add_option( "-A","--archiveforce",   action="store_true",     help="Proceed with archiving even if a preexisting archive exists. Default %default " )
01676     op.add_option( "-a","--archive",     action="store_true",     help="Create archives of the backups ready for offbox transfer. Default %default " )
01677     op.add_option( "-x","--extract",     action="store_true",     help="Extracts backup folders from archives. Default %default " )
01678     op.add_option(      "--backupfold",  default="/var/dbbackup/dbsrv",   help="Folder under which backups and archives are kept. Default %default " )
01679     op.add_option( "-T","--transfer",   action="store_true",     help="Must be used together `--archive` option to transfer tarballs to the remote node configured with `--transfercfg`. Default %default " )
01680     op.add_option(      "--transfercfg",   default=os.environ.get('DBSRV_REMOTE_NODE','S:/data'), help="Configure archive transfers with the ssh node name and destination prefix eg \"S:/data\" that comes ahead of the backupfold. Default %default " )
01681     op.add_option( "-s","--home",           default="loopback",     help="Name of .my.cnf section of information_schema DB on server regarded as home. Should normally be \"loopback\". Default %default " )
01682     opts, args = op.parse_args()
01683     level = getattr(logging, opts.loglevel.upper())
01684     try: 
01685         logging.basicConfig(format=opts.logformat,level=level)
01686     except TypeError:
01687         hdlr = logging.StreamHandler()
01688         formatter = logging.Formatter(opts.logformat)
01689         hdlr.setFormatter(formatter)
01690         log.addHandler(hdlr)
01691         log.setLevel(level)
01692     pass
01693     assert len(args)>1, "need 2 or more arguments %s " % repr(args)
01694 
01695     log.info("sys.version_info %s " % repr(sys.version_info)) 
01696     log.info("got %s args : %s " % (len(args), repr(args))) 
01697 
01698     if opts.transfer:
01699         sshenv()   # pick up ssh environment for use from spartan environments such as the cron commandline  
01700 
01701     home = DB(opts.home, opts=opts)
01702  
01703     arg = args[0]
01704     if arg.find(",") > -1:
01705         log.debug("comma delimited argument %s " % arg )
01706         names = arg.split(",")
01707     elif arg.find("\\") > -1 or arg.find("\*") > -1:
01708         log.debug("regexp argument %s " % arg )
01709         ptn = re.compile(arg)
01710         names = filter(lambda dbname:ptn.match(dbname), home.databases)
01711     else:
01712         log.debug("plain single DB argument %s " % arg )
01713         names = [arg]
01714     pass
01715     log.debug("arg %s names %s " % ( arg, repr(names)))
01716     for name in names: 
01717         if opts.DB_DROP_CREATE:    
01718             home.database_drop_create(name)
01719         pass
01720         db = DB(name, opts=opts, home=home)
01721         db.dispatch( *args[1:] ) 
01722 
01723 if __name__=='__main__':
01724     db = main()
01725      
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 09:55:40 for DybPython by doxygen 1.7.4