/search.css" rel="stylesheet" type="text/css"/> /search.js">
00001 #!/usr/bin/env python 00002 """ 00003 dbsrv : MySQL Server Utilities 00004 ================================ 00005 00006 A more admin centric version of sibling *db.py* with advanced features, including: 00007 00008 * on server optimizations such as `select ... into outfile` 00009 taking advantage of the situation when the mysql client and 00010 server are on the same node. 00011 00012 * partitioned dump/load for dealing with very large tables and incremental backups 00013 00014 * implicit DB addressing without a *~/.my.cnf* section allowing handling of multiple databases all 00015 from the same server via comma delimited names or regular expressions 00016 00017 * despite coming from NuWa it does not need the NuWa environment, system python with MySQLdb is OK 00018 00019 TODO 00020 ----- 00021 00022 #. checking the digests on the target and sending notification emails 00023 00024 #. test dumplocal when partitionsize is an exact factor of table size 00025 #. warnings or asserts when using partitioned dumplocal with disparate table sizes 00026 00027 00028 Usage 00029 ------ 00030 00031 :: 00032 00033 ./dbsrv.py tmp_ligs_offline_db_0 databases 00034 ./dbsrv.py tmp_ligs_offline_db_0 tables 00035 ./dbsrv.py tmp_ligs_offline_db_0 dumplocal --where "SEQNO < 100" 00036 00037 Similar to *db.py* the first argument can be a *~/.my.cnf* section name. 00038 Differently to *db.py* it can also simply be a database name which 00039 does not have a corresponding config section. 00040 00041 In this implicit case the other connection pararameters are obtained 00042 from the so called *home* section. Normally the *home* section is "loopback" 00043 indicating an on server connection. 00044 The *home* section must point to the `information_schema` database. 00045 00046 When the `--home` option is used databases on remote servers can 00047 be accessed without having config sections for them all. 00048 00049 Comparing DB via partitioned dump 00050 ----------------------------------- 00051 00052 Three table dumps skipping the crashed table in order to compare: 00053 00054 * `dybdb1_ligs.tmp_ligs_offline_db_dybdb1` original on dybdb1 00055 * `dybdb2_ligs.channelquality_db_dybdb2` recovered on dybdb2 00056 * `loopback.channelquality_db_belle7` recovered onto belle7 from hotcopy created on belle1 00057 00058 Invoked from cron for definiteness, and ability to leave running for a long time:: 00059 00060 07 17 * * * ( $DYBPYTHON_DIR/dbsrv.py -t DqChannel,DqChannelVld,DqChannelStatusVld --home dybdb1_ligs tmp_ligs_offline_db_dybdb1 dumplocal /tmp/cq/tmp_ligs_offline_db_dybdb1 --partition --partitioncfg 10000,0,33 ) > $CRONLOG_DIR/dbsrv_dump_tmp_ligs_offline_db_dybdb1.log 2>&1 00061 52 18 * * * ( $DYBPYTHON_DIR/dbsrv.py -t DqChannel,DqChannelVld,DqChannelStatusVld --home dybdb2_ligs channelquality_db_dybdb2 dumplocal /tmp/cq/channelquality_db_dybdb2 --partition --partitioncfg 10000,0,33 ) > $CRONLOG_DIR/dbsrv_dump_channelquality_db_dybdb2.log 2>&1 00062 28 20 * * * ( $DYBPYTHON_DIR/dbsrv.py -t DqChannel,DqChannelVld,DqChannelStatusVld --home loopback channelquality_db_belle7 dumplocal /tmp/cq/channelquality_db_belle7 --partition --partitioncfg 10000,0,33 ) > $CRONLOG_DIR/dbsrv_dump_channelquality_db_belle7.log 2>&1 00063 00064 .. warning:: `--partitioncfg` has now been split into `--partitionsize` and `--partitionrange` 00065 00066 00067 Dump speed: 00068 00069 #. remote dumps from dybdb1/dybdb2 to belle7 take approx 165s for each chunk. Thus ~90min for all. 00070 #. local dumps on belle7 take approx 20s for each chunk. Thus ~11min for all. 00071 00072 diffing the dumped partitions 00073 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00074 00075 For the first two all but the partial chunk match. 00076 00077 Range of partition dirs to diff controlled by envvar:: 00078 00079 [blyth@belle7 DybPython]$ RANGE=0,10 ./diff.py /tmp/cq/tmp_ligs_offline_db_dybdb1/10000 /tmp/cq/channelquality_db_dybdb2/10000 00080 [blyth@belle7 DybPython]$ RANGE=10,20 ./diff.py /tmp/cq/tmp_ligs_offline_db_dybdb1/10000 /tmp/cq/channelquality_db_dybdb2/10000 00081 [blyth@belle7 DybPython]$ RANGE=20,30 ./diff.py /tmp/cq/tmp_ligs_offline_db_dybdb1/10000 /tmp/cq/channelquality_db_dybdb2/10000 00082 [blyth@belle7 DybPython]$ RANGE=30,33 ./diff.py /tmp/cq/tmp_ligs_offline_db_dybdb1/10000 /tmp/cq/channelquality_db_dybdb2/10000 ## see diff.py for the output from these 00083 00084 [blyth@belle7 DybPython]$ RANGE=0,10 ./diff.py /tmp/cq/channelquality_db_belle7/10000 /tmp/cq/channelquality_db_dybdb2/10000 00085 [blyth@belle7 DybPython]$ RANGE=10,20 ./diff.py /tmp/cq/channelquality_db_belle7/10000 /tmp/cq/channelquality_db_dybdb2/10000 00086 [blyth@belle7 DybPython]$ RANGE=20,30 ./diff.py /tmp/cq/channelquality_db_belle7/10000 /tmp/cq/channelquality_db_dybdb2/10000 00087 [blyth@belle7 DybPython]$ RANGE=30,33 ./diff.py /tmp/cq/channelquality_db_belle7/10000 /tmp/cq/channelquality_db_dybdb2/10000 00088 00089 oops a difference, but its just different formatting of 0.0001 or 1e-04 00090 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 00091 00092 :: 00093 00094 [blyth@belle7 DybPython]$ RANGE=10,20 ./diff.py /tmp/cq/channelquality_db_belle7/10000 /tmp/cq/channelquality_db_dybdb2/10000 00095 2013-06-07 17:58:06,933 __main__ INFO rng ['10', '11', '12', '13', '14', '15', '16', '17', '18', '19'] 00096 2013-06-07 17:58:26,526 __main__ INFO diff -r --brief /tmp/cq/channelquality_db_belle7/10000/10 /tmp/cq/channelquality_db_dybdb2/10000/10 => 0 00097 2013-06-07 17:58:44,896 __main__ INFO diff -r --brief /tmp/cq/channelquality_db_belle7/10000/11 /tmp/cq/channelquality_db_dybdb2/10000/11 => 0 00098 2013-06-07 17:59:04,360 __main__ INFO diff -r --brief /tmp/cq/channelquality_db_belle7/10000/12 /tmp/cq/channelquality_db_dybdb2/10000/12 => 0 00099 2013-06-07 17:59:22,531 __main__ INFO diff -r --brief /tmp/cq/channelquality_db_belle7/10000/13 /tmp/cq/channelquality_db_dybdb2/10000/13 => 0 00100 2013-06-07 17:59:42,205 __main__ INFO diff -r --brief /tmp/cq/channelquality_db_belle7/10000/14 /tmp/cq/channelquality_db_dybdb2/10000/14 => 0 00101 2013-06-07 18:00:00,385 __main__ INFO diff -r --brief /tmp/cq/channelquality_db_belle7/10000/15 /tmp/cq/channelquality_db_dybdb2/10000/15 => 0 00102 2013-06-07 18:00:20,000 __main__ INFO diff -r --brief /tmp/cq/channelquality_db_belle7/10000/16 /tmp/cq/channelquality_db_dybdb2/10000/16 => 0 00103 2013-06-07 18:00:38,198 __main__ INFO diff -r --brief /tmp/cq/channelquality_db_belle7/10000/17 /tmp/cq/channelquality_db_dybdb2/10000/17 => 0 00104 2013-06-07 18:00:38,704 __main__ INFO diff -r --brief /tmp/cq/channelquality_db_belle7/10000/18 /tmp/cq/channelquality_db_dybdb2/10000/18 => 1 00105 Files /tmp/cq/channelquality_db_belle7/10000/18/DqChannel.csv and /tmp/cq/channelquality_db_dybdb2/10000/18/DqChannel.csv differ 00106 00107 2013-06-07 18:00:56,602 __main__ INFO diff -r --brief /tmp/cq/channelquality_db_belle7/10000/19 /tmp/cq/channelquality_db_dybdb2/10000/19 => 0 00108 [blyth@belle7 DybPython]$ 00109 [blyth@belle7 DybPython]$ 00110 [blyth@belle7 DybPython]$ 00111 [blyth@belle7 DybPython]$ diff /tmp/cq/channelquality_db_belle7/10000/18/DqChannel.csv /tmp/cq/channelquality_db_dybdb2/10000/18/DqChannel.csv 00112 1196930c1196930 00113 < 186235,2,28473,7,67175938,0.0001,7.35714,3.39868,-1,-1 00114 --- 00115 > 186235,2,28473,7,67175938,1e-04,7.35714,3.39868,-1,-1 00116 ... 00117 00118 Commands 00119 ----------- 00120 00121 summary 00122 ~~~~~~~~~ 00123 00124 Providea a summary of table counts and update times in all selected databases. 00125 The DB names are specified by comma delimited OR Regexp string arguments specifying the DB names. 00126 :: 00127 00128 ./dbsrv.py tmp_ligs_offline_db_\\d summary 00129 00130 # local home, requires "loopback" config section pointing to information_schema DB 00131 00132 ./dbsrv.py --home dybdb1 tmp_\\S* summary 00133 00134 # remote home, requires "dybdb1" config section pointing to information_schema DB 00135 00136 TODO: 00137 00138 Check handling of section names the same as DB names on different nodes, as the section config will trump the dbname ? 00139 BUT *home* config host matching should trip asserts ? 00140 00141 00142 dumplocal 00143 ~~~~~~~~~ 00144 00145 The DB tables are dumped as *.csv* files and separate *.schema* files containing table creation SQL. 00146 Without a directory argument the dumps are writes beneath the `--backupfold` controllable directory, such as `/var/dbbackup/dbsrv` 00147 00148 :: 00149 00150 [blyth@belle7 DybPython]$ ./dbsrv.py tmp_ligs_offline_db_0 dumplocal --where 'SEQNO <= 100' 00151 2013-06-13 16:49:38,152 __main__ INFO partition_dumplocal___ SEQNO <= 100 writing /var/dbbackup/dbsrv/belle7.nuu.edu.tw/tmp_ligs_offline_db_0/DqChannel.csv 00152 2013-06-13 16:49:38,578 __main__ INFO partition_dumplocal___ SEQNO <= 100 writing /var/dbbackup/dbsrv/belle7.nuu.edu.tw/tmp_ligs_offline_db_0/DqChannel.csv took 0.39 seconds 00153 ... 00154 00155 [blyth@belle7 DybPython]$ ./dbsrv.py tmp_ligs_offline_db_0 dumplocal /tmp/check/tmp_ligs_offline_db_0 --where 'SEQNO <= 100' 00156 2013-06-13 16:50:49,003 __main__ WARNING using basedir /tmp/check/tmp_ligs_offline_db_0 different from standard /var/dbbackup/dbsrv/belle7.nuu.edu.tw/tmp_ligs_offline_db_0 00157 2013-06-13 16:50:49,031 __main__ INFO partition_dumplocal___ SEQNO <= 100 writing /tmp/check/tmp_ligs_offline_db_0/DqChannel.csv 00158 2013-06-13 16:50:49,203 __main__ INFO partition_dumplocal___ SEQNO <= 100 writing /tmp/check/tmp_ligs_offline_db_0/DqChannel.csv took 0.17 seconds 00159 ... 00160 00161 .. warning:: When there are databases of the same name on multiple nodes it is useful to include the names of the node in the section name 00162 00163 00164 loadlocal 00165 ~~~~~~~~~~~ 00166 00167 When doing a load into a database to be created use `--DB_DROP_CREATE` option:: 00168 00169 [blyth@belle7 DybPython]$ ./dbsrv.py tmp_ligs_offline_db_5 loadlocal ~/tmp_ligs_offline_db_0 -l debug --DB_DROP_CREATE 00170 00171 Typically when loading a database name change in needed, in this case the directory and new section name must be given:: 00172 00173 [blyth@belle7 DybPython]$ ./dbsrv.py tmp_ligs_offline_db_50 loadlocal /var/dbbackup/dbsrv/belle7.nuu.edu.tw/tmp_ligs_offline_db_0 --DB_DROP_CREATE 00174 DROP and reCREATE database tmp_ligs_offline_db_50 loosing all tables contained ? Enter "YES" to proceed : YES 00175 2013-06-13 16:58:41,491 __main__ WARNING using basedir /var/dbbackup/dbsrv/belle7.nuu.edu.tw/tmp_ligs_offline_db_0 different from standard /var/dbbackup/dbsrv/belle7.nuu.edu.tw/tmp_ligs_offline_db_50 00176 2013-06-13 16:58:41,499 __main__ WARNING creating table DqChannel from schema file /var/dbbackup/dbsrv/belle7.nuu.edu.tw/tmp_ligs_offline_db_0/DqChannel.schema 00177 ... 00178 00179 00180 partitioned loadlocal 00181 ~~~~~~~~~~~~~~~~~~~~~~~~ 00182 00183 NB when restoring need to do a name change, so it is neccesary to specify the source directory as an argument 00184 :: 00185 00186 [root@cms01 DybPython]# dbsrv channelquality_db_restored loadlocal /data/var/dbbackup/dbsrv/dybdb2.ihep.ac.cn/channelquality_db_dybdb2 --partition --extract -l debug --DB_DROP_CREATE -C 00187 ## initial run, creating the DB from 32 partitions took ~100 min 00188 00189 [root@cms01 DybPython]# dbsrv channelquality_db_restored loadlocal /data/var/dbbackup/dbsrv/dybdb2.ihep.ac.cn/channelquality_db_dybdb2 --partition --extract -K 00190 ## quick re-run, notices nothing to do and completes in a few seconds 00191 00192 [blyth@cms01 ~]$ type dbsrv # function to nab the NuWa python MySQLdb, as yum is being uncooperative on cms01 00193 dbsrv is a function 00194 dbsrv () 00195 { 00196 local python=/data/env/local/dyb/trunk/external/Python/2.7/i686-slc4-gcc34-dbg/bin/python; 00197 export PYTHONPATH=/data/env/local/dyb/trunk/NuWa-trunk/../external/mysql_python/1.2.3_mysql5.0.67_python2.7/i686-slc4-gcc34-dbg/lib/python2.7/site-packages; 00198 LD_LIBRARY_PATH=/data/env/local/dyb/trunk/NuWa-trunk/../external/mysql/5.0.67/i686-slc4-gcc34-dbg/lib/mysql:$LD_LIBRARY_PATH; 00199 LD_LIBRARY_PATH=/data/env/local/dyb/trunk/NuWa-trunk/../external/mysql_python/1.2.3_mysql5.0.67_python2.7/i686-slc4-gcc34-dbg/lib:$LD_LIBRARY_PATH; 00200 export LD_LIBRARY_PATH; 00201 $python -c "import MySQLdb"; 00202 $python ~blyth/DybPython/dbsrv.py $* 00203 } 00204 00205 Test run on cms01 chugging along at ~3 min per 10k partition, at 32 partitions estimate ~100 min to complete 00206 00207 :: 00208 00209 [blyth@belle7 DybPython]$ ./dbsrv.py channelquality_db_restored loadlocal /var/dbbackup/dbsrv/dybdb2.ihep.ac.cn/channelquality_db_dybdb2 --partition --extract -l debug --DB_DROP_CREATE -C 00210 00211 00212 00213 Partitioned Commands 00214 ---------------------- 00215 00216 The partitioning relies on these options: 00217 00218 `--partition` 00219 switches on partitioning, default False 00220 `--partitionkey` 00221 default "SEQNO,0", corresponding to the key name and its position in CSV dumps 00222 `--partitioncfg` 00223 **NOW RPLACED WITH THE BELOW TWO OPTIONS** 00224 default "10000,0,33", the three integers specify the number of keys in each chunk `10000` 00225 and the range of chunks `range(0,33)` ie 0 to 32 00226 `--partitionsize` 00227 default "10000", specify the number of keys in each chunk 00228 `--partitionrange` 00229 default of None, meaning all partitions. 00230 If specified as eg "0,33" it restricts to a range of partition indices `range(0,33)` 00231 --partitionlast` 00232 **NOW DEPRECATED** This the last partition is now auto determined, to allow daily cron running 00233 default None, when set to an integer string eg "32" this is used to identifiy the index 00234 of the last incomplete partition 00235 00236 For dump and load to refer to the same partition set, requires the same chunk size 00237 (and partition key although this is not checked). 00238 00239 partitioned loadlocal 00240 ~~~~~~~~~~~~~~~~~~~~~~~ 00241 00242 From cron:: 00243 00244 DYBPYTHON_DIR=/data1/env/local/dyb/NuWa-trunk/dybgaudi/DybPython/python/DybPython 00245 03 20 * * * ( $DYBPYTHON_DIR/dbsrv.py channelquality_db_0 loadlocal /tmp/cq/channelquality_db --partition --DB_DROP_CREATE -C ) > $CRONLOG_DIR/dbsrv_load_.log 2>&1 00246 00247 quick partitioning test 00248 ~~~~~~~~~~~~~~~~~~~~~~~~ 00249 00250 For fast dump/load testing use small chunks and range of partitions:: 00251 00252 ./dbsrv.py tmp_ligs_offline_db_0 dumplocal /tmp/pp/tmp_ligs_offline_db_0 --partition --partitionsize 10 --partitionrange 0,5 00253 ./dbsrv.py tmp_ligs_offline_db_5 loadlocal /tmp/pp/tmp_ligs_offline_db_0 --partition --partitionsize 10 --partitionrange 0,5 --DB_DROP_CREATE -C 00254 00255 00256 Archiving and transfers to remote node 00257 ----------------------------------------- 00258 00259 Controlled via options: 00260 00261 `-a/--archive` 00262 switch on archive creation 00263 00264 `-x/--extract` 00265 switch on archive extraction 00266 00267 `--backupfold` 00268 default /var/dbbackup/dbsrv, the location of backup dumps and tarballs 00269 00270 `-T/--transfer` 00271 switch on remote transfer of archives, must be used together with `-a/--archive` and the *dumplocal* command to be effective 00272 00273 `--transfercfg` 00274 configures the remote node and possible a directory prefix, that is prepended infront of the `backupfold` 00275 00276 00277 For example the below command dumps partitions 0,1 and 2, creates archive tarballs and transfers them to the remote node configured:: 00278 00279 ./dbsrv.py -t DqChannel,DqChannelVld,DqChannelStatusVld --home loopback channelquality_db_belle7 dumplocal /tmp/cq/channelquality_db_belle7 --partition --partitionrange 0,3 -aT 00280 00281 The local and remote tarball paths are the same, with no transfercfg prefix specified, namely:: 00282 00283 /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7/10000_0.tar.gz 00284 00285 00286 Transfer Optimization 00287 ~~~~~~~~~~~~~~~~~~~~~~~ 00288 00289 A small .dna sidecar to the tarballs is used for tarball content identification. 00290 When a rerun of the transfer is made, the sidecar DNA is first checked to see 00291 if the remote node already holds the tarball. 00292 00293 This means that only newly reached partitions are archived and transferred. 00294 The `last` incomplete partition will typically be transferred every time as it will 00295 have a different content causing the DNA mismatch to trigger a re-transfer. 00296 00297 00298 Full archive/transfer cron test from belle7 to belle1 00299 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00300 00301 To prepare the remote node just need to create and set ownership of `backupfold` eg `/var/dbbackup/dbsrv` 00302 and ensure keyed ssh access is working 00303 00304 :: 00305 00306 DYBPYTHON_DIR=/data1/env/local/dyb/NuWa-trunk/dybgaudi/DybPython/python/DybPython 00307 DBSRV_REMOTE_NODE=N1 00308 35 18 * * * ( $DYBPYTHON_DIR/dbsrv.py -t DqChannel,DqChannelVld,DqChannelStatusVld --home loopback channelquality_db_belle7 dumplocal --partition --archive --transfer ) > $CRONLOG_DIR/dbsrv_pat_channelquality_db_belle7.log 2>&1 00309 00310 00311 Installation on dybdb2 00312 ------------------------ 00313 00314 Prepare target node 00315 ~~~~~~~~~~~~~~~~~~~~~ 00316 00317 The administrator of target node needs to prepare a folder for the archives:: 00318 00319 [blyth@cms01 ~]$ sudo mkdir /data/var/dbbackup/dbsrv 00320 [blyth@cms01 ~]$ sudo chown -R dayabayscp.dayabayscp /data/var/dbbackup/dbsrv 00321 00322 Setup mysql config at source 00323 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00324 00325 The config file :file:`~/.my.cnf` needs two sections "loopback" and "channelquality_db_dybdb2":: 00326 00327 [loopback] 00328 host = 127.0.0.1 00329 database = information_schema 00330 user = root 00331 password = *** 00332 00333 [channelquality_db_dybdb2] 00334 host = 127.0.0.1 00335 database = channelquality_db 00336 user = root 00337 password = *** 00338 00339 SSH environment configuration 00340 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00341 00342 The script runs *scp* commands internally that require: 00343 00344 * `ssh-agent` process to be running and authenticated 00345 * public keys of source node to be appended to `.ssh/authorized_keys2` of target 00346 * :envvar:`SSH_AUTH_SOCK` to be defined. 00347 00348 When run from cron the envvar is typically not present. 00349 In order to define this the :file:`~/.ssh-agent-info-$NODE_TAG` 00350 is parsed by the `sshenv()` from common.py. 00351 00352 This file is created by the **env** function `ssh--agent-start` which is used 00353 following reboots to start and authenticate the ssh agent process. 00354 00355 * http://belle7.nuu.edu.tw/e/base/ssh/ 00356 00357 Get DybPython from dybsvn 00358 ~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00359 00360 :: 00361 00362 cd 00363 svn co http://dayabay.ihep.ac.cn/svn/dybsvn/dybgaudi/trunk/DybPython/python/DybPython 00364 00365 Despite coming from dybsvn the *dbsrv.py* script does not need the NuWa environment. Just 00366 the MySQLdb extension in the system python should be adequate. 00367 00368 00369 Quick Interactive Test 00370 ~~~~~~~~~~~~~~~~~~~~~~~ 00371 00372 Configuring 5 small 100 SEQNO partitions allows the machinery to be quickly tested:: 00373 00374 cd DybPython 00375 ./dbsrv.py channelquality_db_dybdb2 dumplocal --partition --partitionsize 100 --partitionrange 0,5 --archive --transfer 00376 00377 00378 CRON commandline 00379 ~~~~~~~~~~~~~~~~~~ 00380 00381 :: 00382 00383 ## NB no DBSRV_REMOTE_NODE is needed, the default of S:/data is appropriate 00384 DYBPYTHON_DIR=/root/DybPython 00385 CRONLOG_DIR=/root/cronlog 00386 NODE_TAG=D2 00387 # 00388 42 13 * * * ( $DYBPYTHON_DIR/dbsrv.py channelquality_db_dybdb2 dumplocal --partition --archive --transfer ) > $CRONLOG_DIR/dbsrv_channelquality_db_dybdb2.log 2>&1 00389 00390 00391 A do-nothing run, when there are no new partitions to dump/archive/transfer takes about 4 mins and uses little resources. 00392 When there are new completed partitions to archive and transfer, the default chunk size of 10000 SEQNO leads to tarballs 00393 of only 35M (maybe 70M when move for all 4 tables) resulting in rapid transfers. 00394 00395 Although new completed partitions might be reached perhaps every ~10 days with the 10k chunks, a daily 00396 transfer is still recommended in order to backup the last incomplete partition and also in order that 00397 issues with the transfer are rapidly identified and resolved. 00398 00399 00400 00401 Transfer Monitoring 00402 ~~~~~~~~~~~~~~~~~~~ 00403 00404 Implemented using *valmon.py* with *digestpath.py*. Valmon needs to run as a daily 00405 cronjob on the remote node. Configure with **dbsrvmon** section:: 00406 00407 % ~/.env.cnf blyth@belle1.nuu.edu.tw 00408 [dbsrvmon] 00409 tn = channelquality_db 00410 chdir = /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7/archive/10000 00411 return = dict 00412 dbpath = ~/.env/dbsrvmon.sqlite 00413 cmd = digestpath.py 00414 note = stores the dict returned by the command as a string in the DB without interpretation 00415 valmon_version = 0.2 00416 constraints = ( tarball_count >= 34, dna_mismatch == 0, age < 86400 , age < 1000, ) 00417 00418 Tested on belle1:: 00419 00420 [blyth@belle1 e]$ valmon.py -s dbsrvmon ls 00421 2013-06-17 11:48:01,515 env.db.valmon INFO /home/blyth/env/bin/valmon.py -s dbsrvmon ls 00422 2013-06-17 11:48:01,520 env.db.valmon WARNING no email section configures and no MAILTO envvar, NOTIFICATION WILL FAIL 00423 2013-06-17 11:48:01,521 env.db.valmon INFO arg ls 00424 ('2013-06-13T19:46:01', 5.5278148651123047, "{'dna_match': 34, 'lookstamp': 1371123961.7826331, 'dna_mismatch': 0, 'tarball_count': 34, 'age': 9030.7826330661774, 'lastchange': 1371114931, 'dna_missing': 0}", 0.0, 0) 00425 ('2013-06-13T19:54:06', 5.8677470684051514, "{'dna_match': 34, 'lookstamp': 1371124446.7869501, 'dna_mismatch': 0, 'tarball_count': 34, 'age': 9515.7869501113892, 'lastchange': 1371114931, 'dna_missing': 0}", 0.0, 0) 00426 00427 00428 00429 Obtain the backup tarballs 00430 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00431 00432 As of Dec 24 2013 there are 54 tarballs of 43M each, corresponding to 2322M total. scp them using the 00433 scponly account on cms01. Qiumei/Simon can provide the password:: 00434 00435 dir=/data/var/dbbackup/dbsrv/dybdb2.ihep.ac.cn/channelquality_db_dybdb2/archive/ 00436 mkdir -p $dir && cd $dir 00437 scp -r dayabayscp@cms01.phys.ntu.edu.tw:/data/var/dbbackup/dbsrv/dybdb2.ihep.ac.cn/channelquality_db_dybdb2/archive/10000 . 00438 00439 00440 Partioned dump usage 00441 ----------------------- 00442 00443 Full backups are impractical for 10G tables. 00444 00445 Partitioned dumping is attactive for backups of such large tables, 00446 as just new partitions need to be dumped on each invokation. 00447 00448 For scp transfers would need to create tarfiles for each partition 00449 with dna sidecars, and add a transfer subcommand with option controlled remote node. 00450 Clearly via dna checking would allow only new partitions to be transfereed. 00451 00452 00453 00454 System python API warning 00455 -------------------------- 00456 00457 Careful regarding PYTHONPATH, when mixing a NuWa PYTHONPATH with a system python get API RuntimeWarning:: 00458 00459 [blyth@belle7 DybPython]$ /usr/bin/python dbsrv.py -t DqChannel,DqChannelVld,DqChannelStatusVld --home dybdb2_ligs channelquality_db_dybdb2 dumplocal /tmp/cq/channelquality_db_dybdb2 --partition --partitioncfg 10,0,1 00460 /data1/env/local/dyb/external/mysql_python/1.2.3_mysql5.0.67_python2.7/i686-slc5-gcc41-dbg/lib/python2.7/site-packages/MySQLdb/__init__.py:19: RuntimeWarning: Python C API version mismatch for module _mysql: This Python has API version 1012, module _mysql has version 1013. 00461 import _mysql 00462 2013-06-06 18:03:08,963 __main__ INFO schema dir /tmp/cq/channelquality_db_dybdb2/10/_ exists already 00463 2013-06-06 18:03:08,963 __main__ INFO /* 10-partition 1 /1 */ SEQNO >= 1 and SEQNO <= 10 00464 2013-06-06 18:03:09,165 __main__ INFO checking prior csv dump /tmp/cq/channelquality_db_dybdb2/10/0 --partitioncfg 10,0,1 00465 [blyth@belle7 DybPython]$ 00466 00467 Avoiding the NuWa PYTHONPATH means are entirely system and avoid the RuntimeWarning:: 00468 00469 [blyth@belle7 DybPython]$ PYTHONPATH= /usr/bin/python dbsrv.py -t DqChannel,DqChannelVld,DqChannelStatusVld --home dybdb2_ligs channelquality_db_dybdb2 dumplocal /tmp/cq/channelquality_db_dybdb2 --partition --partitioncfg 10,0,1 00470 2013-06-06 18:04:58,078 __main__ INFO schema dir /tmp/cq/channelquality_db_dybdb2/10/_ exists already 00471 2013-06-06 18:04:58,078 __main__ INFO /* 10-partition 1 /1 */ SEQNO >= 1 and SEQNO <= 10 00472 2013-06-06 18:04:58,282 __main__ INFO checking prior csv dump /tmp/cq/channelquality_db_dybdb2/10/0 --partitioncfg 10,0,1 00473 [blyth@belle7 DybPython]$ 00474 00475 00476 Import Notes 00477 ------------- 00478 00479 #. keep to a minimum of imports for portability to server situation, ie do not rely on NuWa environment 00480 #. MySQLdb optionality is to allows non MySQL-python nodes to autodoc 00481 00482 """ 00483 import sys, os, logging, inspect, time, re, tarfile, platform, platform 00484 from datetime import datetime 00485 log = logging.getLogger(__name__) 00486 from ConfigParser import ConfigParser, NoSectionError 00487 from pprint import pformat 00488 from tar import Tar 00489 from common import sshenv 00490 00491 00492 00493 def _sorted(iterable, key=lambda _:_, reverse=False): 00494 """ 00495 sorted for py23, caution returns lists not tuples 00496 """ 00497 temp = [(key(x), x) for x in iterable] 00498 temp.sort() 00499 if reverse: 00500 return [temp[i][1] for i in xrange(len(temp) - 1, -1, -1)] 00501 return [t[1] for t in temp] 00502 00503 try: 00504 sorted 00505 except NameError: 00506 sorted = _sorted 00507 00508 00509 00510 try: 00511 import MySQLdb 00512 except ImportError: 00513 MySQLdb = None 00514 00515 try: 00516 import _mysql 00517 except ImportError: 00518 _mysql = None 00519 00520 00521 def attrs_(mod, names): 00522 """Access module constants is a version independant way """ 00523 if mod: 00524 return map( lambda _:getattr(mod, _), filter( lambda _:hasattr(mod,_), names.split() ) ) 00525 else: 00526 return None 00527 string_types = attrs_( MySQLdb.constants.FIELD_TYPE , 'VARCHAR DATETIME STRING VAR_STRING' ) 00528 00529 00530 seconds = {} 00531 def timing(func): 00532 def wrapper(*arg,**kw): 00533 '''source: http://www.daniweb.com/code/snippet368.html''' 00534 t1 = time.time() 00535 res = func(*arg,**kw) 00536 t2 = time.time() 00537 global seconds 00538 seconds[func.func_name] = (t2-t1) 00539 return res 00540 return wrapper 00541 00542 00543 class CSVFormat(list): 00544 """ 00545 Provides the format string to create the CSV line 00546 appropriate for the field types of the low level query result description 00547 It looks something like:: 00548 00549 %s,"%s","%s",%s,%s,%s,%s,%s,"%s","%s" 00550 00551 Usage example:: 00552 00553 llc = _mysql.connection(...) 00554 llc.query("select * from ...") 00555 result = llc.store_result() 00556 csvf = CSVFormat( result.describe() ) 00557 for row in result.fetch_row(0): 00558 print str(csvf) % tuple(row) 00559 00560 """ 00561 00562 def __str__(self): 00563 def field_fmt(fdesc): 00564 if fdesc[1] in string_types: 00565 return "\"%s\"" 00566 return "%s" 00567 return ",".join( map(field_fmt, self) ) 00568 00569 class DB(object): 00570 def __init__(self, sect, opts=None, home=None): 00571 """ 00572 :param sect: name of section in config file 00573 :param opts: options 00574 :param home: DB instance 00575 00576 00577 Safety constraints on config to minimize accidents from config confusion. 00578 00579 Initially required non-loopback section names and database names to be the same 00580 00581 Loosening this to allow remote commands, by designating a "home" instance 00582 and requiring all other instances to match that one in everything but the 00583 database name 00584 00585 """ 00586 pass 00587 self.sect = sect 00588 self.opts = opts 00589 self.home = home 00590 self.backupdir = os.path.join(self.opts.backupfold, platform.node(), sect ) 00591 00592 cnf = MyCnf("~/.my.cnf") 00593 dbc = cnf.mysqldb_pars(sect, home=home) # NB sect is taken as the DB name if there is no such section in the config 00594 00595 log.debug("sect %s home %s dbc %s " % (sect, home, repr(dbc))) 00596 00597 if sect == "loopback": 00598 assert dbc['host'] in ('localhost','127.0.0.1') # curious 127.0.0.1 not working on cms01 need localhost 00599 pass 00600 if home is None: 00601 assert dbc['db'] in ("information_schema", "mysql") # older mysql (eg on cms01 does not have information_schema DB ? OR its protected) 00602 assert opts.home == sect, "only the home instance is allowed an undefined home parameter " 00603 else: 00604 log.debug(" dbc %s " % repr(dbc)) 00605 log.debug("home.dbc %s " % repr(home.dbc)) 00606 #qwns = "host user passwd" 00607 qwns = "host" 00608 for qwn in qwns.split(): 00609 assert dbc.get(qwn,None) == home.dbc.get(qwn,None), "non \"home\" instances are constrained to match the \"home\" in everything but the db " 00610 00611 self.is_local = dbc['host'] == '127.0.0.1' 00612 self.dbc = dbc 00613 self.cnf = cnf 00614 log.debug("connecting to %s " % dict(dbc, passwd="***")) 00615 try: 00616 conn = MySQLdb.connect( **dbc ) # huh, version variation in accepted params 00617 except MySQLdb.Error, e: 00618 raise Exception("Error %d: %s " % ( e.args[0], e.args[1] ) ) 00619 00620 llconn = _mysql.connect( **dbc ) 00621 00622 self.conn = conn 00623 self.llconn = llconn 00624 self._size = None 00625 pass 00626 00627 partitionmgr = PartitionMgr(opts, self) 00628 self.partitionmgr = partitionmgr 00629 00630 00631 def dispatch(self, *args, **kwa): 00632 if self.opts.partition: 00633 pfx = "partition_" 00634 else: 00635 pfx = "" 00636 00637 cmd = pfx + args[0] + "___" 00638 if hasattr(self, cmd ): 00639 getattr( self , cmd)( *args[1:], **kwa ) 00640 else: 00641 raise Exception("cmd %s not implemented " % cmd) 00642 00643 def lscmd___(self, *args, **kwa ): 00644 for m in filter(lambda _:_[-3:] == '___', dir(self)): 00645 print m[:-3] 00646 00647 def database_drop_create(self, dbname): 00648 """ 00649 :param dbname: name of the database to be dropped and recreated 00650 """ 00651 assert self.sect == "loopback" and self.sect != dbname , ( self.sect, dbname ) 00652 if self.opts.noconfirm: 00653 log.info("proceed with DB_DROP_CREATE of %(dbname)s without confirmation" % locals() ) 00654 else: 00655 ans = raw_input("DROP and reCREATE database %(dbname)s loosing all tables contained ? Enter \"YES\" to proceed : " % locals()) 00656 if ans != "YES": 00657 log.warn("skipping DROP CREATE ") 00658 return 00659 00660 self("drop database if exists %(dbname)s " % locals() ) 00661 self("create database %(dbname)s " % locals() ) 00662 00663 00664 def docs( cls ): 00665 """ 00666 collect the docstrings on command methods 00667 identified by naming convention of ending with ___ 00668 """ 00669 mdoc = lambda m:getattr(m,'__doc__',None) 00670 mdocs = [ dict(meth=k[:-3],doc=mdoc(v)) for k,v in [(k,v) for k,v in inspect.getmembers(cls) if k[-3:]=='___' and k[0] != '_' and mdoc(v)]] 00671 return "\n".join([ """ %(meth)s : %(doc)s """ % d for d in mdocs ]) 00672 docs = classmethod(docs) 00673 00674 00675 def execute_(self, cmd): 00676 cursor = self.conn.cursor(MySQLdb.cursors.DictCursor) 00677 cursor.execute( cmd ) 00678 return cursor 00679 00680 def fetchall(self, cmd ): 00681 cursor = self.execute_(cmd) 00682 rows = cursor.fetchall() 00683 self.count = cursor.rowcount 00684 cursor.close() 00685 return rows 00686 00687 def _query_size(self): 00688 sql = "select round(sum((data_length+index_length-data_free)/1024/1024),2) as TOT_MB from information_schema.tables where table_schema = '%(db)s' " % self.dbc 00689 return float(self(sql)[0]['TOT_MB']) 00690 def _get_size(self): 00691 if self._size is None: 00692 self._size = self._query_size() 00693 return self._size 00694 size = property(_get_size, doc="Size estimate of the DB in MB ") 00695 00696 def _get_databases(self): 00697 """ 00698 This query gives fewer results than `show databases`, which demands skips to avoid errors in getting sizes 00699 #skip = "hello hello2 other test_noname tmp_cascade_2 tmp_dbitest tmp_tmp_offline_db_2".split() 00700 """ 00701 sql = "select distinct(table_schema) from information_schema.tables" 00702 return map(lambda _:_['table_schema'],self(sql)) 00703 databases = property(_get_databases, doc="List of database names obtained from information_schema.tables") 00704 00705 00706 def _get_tables_infoschema(self): 00707 """ 00708 """ 00709 sql = "select distinct(table_name) from information_schema.tables where table_schema='%(db)s'" % self.dbc 00710 return map(lambda _:_['table_name'],self(sql)) 00711 tables = property(_get_tables_infoschema, doc="List of table names obtained from information_schema.tables") 00712 00713 00714 def _get_tables(self): 00715 """ 00716 Older mysql does not have information_schema 00717 """ 00718 sql = "show tables" 00719 tables = [] 00720 for d in self(sql): 00721 k,v = d.items()[0] 00722 tables.append(v) 00723 return tables 00724 tables = property(_get_tables, doc="List of table names obtained from show tables") 00725 00726 00727 def _get_datadir(self): 00728 return self("select @@datadir as datadir")[0]['datadir'] 00729 datadir = property(_get_datadir, doc="Query DB server to find the datadir, eg /var/lib/mysql/ OR /data/mysql/ ") 00730 00731 def __call__(self, cmd): 00732 log.debug(cmd) 00733 t0 = time.time() 00734 ret = self.fetchall(cmd) 00735 t1 = time.time() 00736 self.lastseconds = t1 - t0 00737 return ret 00738 00739 00740 def lsdatabases___(self, *args, **kwa ): 00741 """ 00742 list databases 00743 """ 00744 print "\n".join(self.databases) 00745 00746 def lstables___(self, *args, **kwa ): 00747 """ 00748 list tables 00749 """ 00750 print "\n".join(self.tables) 00751 00752 def summary___(self, *args, **kwa): 00753 """ 00754 Present summary of tables in rst table format: 00755 00756 ============================== ========== ============================== ============================== 00757 TABLE_NAME TABLE_ROWS CREATE_TIME CHECK_TIME 00758 ============================== ========== ============================== ============================== 00759 DqChannel 62126016 2013-05-30 18:52:51 2013-05-30 18:52:51 00760 DqChannelStatus 62126016 2013-05-30 18:17:42 2013-05-30 18:17:42 00761 DqChannelStatusVld 323573 2013-05-30 18:52:44 None 00762 DqChannelVld 323573 2013-05-30 19:34:55 None 00763 LOCALSEQNO 3 2013-05-30 19:35:02 None 00764 ============================== ========== ============================== ============================== 00765 00766 """ 00767 result = self("select TABLE_NAME, TABLE_ROWS, CREATE_TIME, UPDATE_TIME, CHECK_TIME from information_schema.tables where table_schema = '%(db)s' " % self.dbc) 00768 kws = (('TABLE_NAME', 30), ('TABLE_ROWS', 10), ('CREATE_TIME', 30 ), ('CHECK_TIME',30),) 00769 print "\n".join(["",self.sect,self._rst_table( result, kws )]) 00770 00771 00772 def _rst_table(self, result, kws, char="="): 00773 """ 00774 :param result: interable providing result dicts 00775 :param kws: sequence of 2-tuples providing result dict keys and presentation widths 00776 :param char: 00777 :return: multi line string rst table presentation 00778 """ 00779 fcol_ = lambda kw:"%s(%s)-%s%s" % ("%",kw[0],kw[1],"s") 00780 fmt = " ".join(map(fcol_,kws) ) 00781 00782 # spell these out for ancient python 00783 #mkr = fmt % dict((kw[0],char * kw[1]) for kw in kws) 00784 #lbl = fmt % dict((kw[0],kw[0]) for kw in kws) 00785 dmkr = {} 00786 dlbl = {} 00787 for kw in kws: 00788 dmkr[kw[0]] = char * kw[1] 00789 dlbl[kw[0]] = kw[0] 00790 mkr = fmt % dmkr 00791 lbl = fmt % dlbl 00792 bdy = [fmt % d for d in result] 00793 return "\n".join([mkr, lbl, mkr] + bdy + [mkr]) 00794 00795 00796 def _get_utables(self): 00797 """ 00798 """ 00799 alltables = self._get_tables() 00800 if self.opts.tables is None: 00801 return alltables 00802 else: 00803 return filter(lambda t:t in alltables, self.opts.tables.split(",")) 00804 utables = property(_get_utables, doc="List of tables to use in operations, when --tables option is used this can be a subset of all tables.") 00805 00806 def show_create( self, table): 00807 return self("show create table %(table)s" % locals())[0]['Create Table'] 00808 00809 def _dumplocal_mkdir(self, outdir ): 00810 """ 00811 :param outdir: 00812 """ 00813 if not os.path.exists(outdir): 00814 os.makedirs(outdir) # umask gets in the way of the mode argument here 00815 os.chmod(outdir, 0777) # needs to be writable by other so mysql user can write into it 00816 assert os.path.isdir(outdir), outdir 00817 pass 00818 def _dumplocal_schema(self, outdir, table ): 00819 """ 00820 :param outdir: 00821 :param table: 00822 """ 00823 schema=os.path.join(outdir, "%s.schema" % table ) 00824 sf = open(schema, "w") 00825 sf.write( self.show_create(table) ) 00826 sf.close() 00827 pass 00828 def _dumplocal_table(self, outdir, table, where ): 00829 """ 00830 :param outdir: 00831 :param table: 00832 :param where: 00833 """ 00834 outfile=os.path.join(outdir, "%s.csv" % table ) 00835 log.info("_dumplocal_table %s writing %s " % (where,outfile) ) 00836 pass 00837 00838 if table == 'LOCALSEQNO': 00839 where = "TRUE" 00840 00841 if self.is_local: 00842 log.debug("IntoOutFile") 00843 if os.path.exists(outfile): 00844 log.info("remove preexisting outfile %s " % outfile ) 00845 os.remove(outfile) 00846 io = IntoOutfile(table=table,where=where, outfile=outfile) 00847 self(str(io)) 00848 else: 00849 log.debug("RemoteIntoOutFile") 00850 t0 = time.time() 00851 tf = open(outfile,"w") 00852 rio = RemoteIntoOutfile(table=table, where=where) 00853 self._write_csvdirect(rio, tf ) ## result of select is returned to python and thence formatted directly into csv, works remotely 00854 tf.close() 00855 t1 = time.time() 00856 self.lastseconds = t1 - t0 00857 pass 00858 log.info("partition_dumplocal___ %s writing %s took %s seconds " % (where,outfile, "%5.2f" % self.lastseconds ) ) 00859 pass 00860 00861 def _write_csvdirect(self, select , tf ): 00862 """ 00863 Adopt low level approach to avoid unnecessary conversions into 00864 python types then back to string and the associated difficulties of 00865 then getting precisely the same as SELECT * INTO OUTFILE 00866 00867 Note that use of `store_result` rather than `use_result` means 00868 that all rows are in memory at once. 00869 00870 NB for consistency the CSV ouput by this command MUST MATCH that 00871 by _write_outfile 00872 00873 `_write_csvdirect` is used by **rdumpcat** , this mimics 00874 the output from `_write_outfile` (used by **dumpcat**) with 00875 the big advantage that it works remotely, with no strong permission 00876 requirements 00877 00878 TODO: 00879 00880 #. when there is a pre-existing LOCALSEQNO redirect LOCALSEQNO to a temporay file 00881 and do a merge... easiest to instanciate them as AsciiCSV and then merge at that level 00882 00883 """ 00884 q = str(select) 00885 log.debug("_write_csvdirect %s " % q) 00886 00887 llconn = self.llconn 00888 llconn.query( q ) 00889 00890 lessmemory = True 00891 if lessmemory: 00892 log.debug("using `--LESSMEMORY` option : less memory expensive but more network expensive 'use_result' ") 00893 result = llconn.use_result() 00894 else: 00895 log.debug("using more memory expensive but less network expensive 'store_result' ") 00896 result = llconn.store_result() 00897 00898 csvf = CSVFormat( result.describe() ) 00899 for row in result.fetch_row(maxrows=0, how=0): ## all rows as tuples 00900 tf.write( str(csvf) % tuple(row) +"\n" ) 00901 00902 00903 def timestamped_dir(self, *args): 00904 """ 00905 Timestamping is needed for non-partitioned case 00906 """ 00907 bdir = self.determine_basedir(*args) 00908 odir = os.path.join(bdir, self.opts.timestamp ) 00909 return odir 00910 00911 def dumplocal___(self, *args, **kwa ): 00912 """ 00913 :param outdir: specifies output directory which must be writable by mysql user, it will be created if not existing 00914 00915 Rerunning this will do quick checks of the CSV files, looking at line counts 00916 and the first and last line and comparing with expections from DB queries. The quick 00917 checks are done via commands: 00918 00919 * `wc` 00920 * `head -1` 00921 * `tail -1` 00922 00923 This is not called in the partitioned case. 00924 """ 00925 where = kwa.pop('where',None) 00926 if where is None: 00927 where = self.opts.where 00928 00929 tables = kwa.pop('tables',None) 00930 if tables is None: 00931 tables = self.utables 00932 00933 odir = self.timestamped_dir(*args) 00934 if os.path.exists(odir): 00935 log.info("dumplocal_ timestamped dir %s exists already, skipping dump" % odir ) 00936 else: 00937 log.info("dumplocal_ into timestamped dir %s " % odir ) 00938 self._dumplocal_mkdir(odir) 00939 for table in tables: 00940 self._dumplocal_schema(odir, table) 00941 self._dumplocal_table(odir, table, where) 00942 pass 00943 pass 00944 if self.opts.archive or self.opts.archiveforce: 00945 self.archive(odir) 00946 00947 00948 def fields(self, table, filter_=None): 00949 return filter(filter_,map(lambda _:_['Field'],self("describe %(table)s" % locals()))) 00950 00951 def range(self, table, where="1=1", keycount=True): 00952 sql = self.partitionmgr.qrange(table, where, keycount=keycount) 00953 return self(sql)[0] 00954 00955 def minmax(self, table, where="1=1"): 00956 sql = self.partitionmgr.qminmax(table, where) 00957 return self(sql)[0] 00958 00959 def ptables(self): 00960 """ 00961 :return: list of tables with the key field 00962 """ 00963 key = self.partitionmgr.key 00964 return filter(lambda t:key in self.fields(t), self.utables) 00965 00966 00967 def tabminmax_csv(self, path): 00968 kpo = self.partitionmgr.kpo 00969 head = os.popen("head -1 %(path)s" % locals()).read().strip() 00970 tail = os.popen("tail -1 %(path)s" % locals()).read().strip() 00971 table = os.path.basename(path[:-4]) 00972 minkey = int(head.split(",")[kpo]) 00973 maxkey = int(tail.split(",")[kpo]) 00974 return table, minkey, maxkey 00975 00976 00977 def _wc_csv(self, outdir, keycount=False): 00978 """ 00979 :param outdir: partition directory 00980 :return: dict of keyed by table name providing csv info, line count, 00981 00982 :: 00983 00984 {'count': 10, 'keycount': 10, 'max': 40, 'min': 31} 00985 00986 00987 #. getting the keycount in a general manner, 00988 ie number of distinct key values would requiring 00989 parsing the entire CSV so fake it from head and tail 00990 00991 :: 00992 00993 [blyth@belle7 ~]$ wc -l /tmp/pp/tmp_ligs_offline_db_0/1000/1/*.csv 00994 192000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannel.csv 00995 192000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannelStatus.csv 00996 1000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannelStatusVld.csv 00997 1000 /tmp/pp/tmp_ligs_offline_db_0/1000/1/DqChannelVld.csv 00998 386000 total 00999 """ 01000 if not os.path.exists(outdir): 01001 return None 01002 01003 pipe = os.popen("wc -l %(outdir)s/*.csv" % locals()) 01004 lines = pipe.readlines() 01005 rc = pipe.close() 01006 if rc is None: 01007 rc = 0 01008 rc = os.WEXITSTATUS(rc) 01009 log.debug("wc return code %(rc)s for %(outdir)s " % locals() ) 01010 01011 if rc != 0: 01012 log.warn("Problem with csv files in %(outdir)s wc rc %(rc)s " % locals() ) 01013 return None 01014 01015 wc = {} 01016 for n, path in map(lambda _:_.lstrip().rstrip().split(), lines): 01017 if path[-4:] != '.csv':continue 01018 pass 01019 table, minkey, maxkey = self.tabminmax_csv(path) 01020 wc[table] = dict(count=int(n),min=minkey,max=maxkey) 01021 if keycount: 01022 wc[table]['keycount'] = maxkey - minkey + 1 # this is quick and dirty 01023 return wc 01024 01025 def determine_basedir(self, *args): 01026 """ 01027 """ 01028 if len(args)>0: 01029 basedir = args[0] 01030 else: 01031 basedir = self.backupdir # eg /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7 01032 pass 01033 if basedir != self.backupdir: 01034 log.warn("using basedir %s different from standard %s " % (basedir,self.backupdir) ) 01035 else: 01036 log.debug("using basedir %s " % basedir ) 01037 01038 return basedir 01039 01040 01041 def assert_valid_dump(self, pdir, csv, chk ): 01042 assert csv == chk , ("prior dump %s check fail" % pdir, pformat(csv),pformat(chk)) 01043 assert sorted(csv.keys()) == sorted(chk.keys()), ("tables mismatch", sorted(csv.keys()), sorted(chk.keys())) 01044 for table in csv.keys(): 01045 if chk[table].has_key('keycount'): 01046 assert chk[table]['keycount'] == csv[table]['keycount'], (table, chk[table]['keycount'], csv[table]['keycount'] ) 01047 assert chk[table]['count'] == csv[table]['count'], (table, chk[table]['count'], csv[table]['count'] ) 01048 assert chk[table]['min'] == csv[table]['min'], (table, chk[table]['min'], csv[table]['min'] ) 01049 assert chk[table]['max'] == csv[table]['max'], (table, chk[table]['min'], csv[table]['max'] ) 01050 01051 01052 def partition_dumpcheck(self, pdir, pwhere, is_last, keycount=False ): 01053 """ 01054 Checks a partition dump returning flag to signal a dump or not. 01055 01056 :param pdir: 01057 :param pwhere: 01058 :param is_last: 01059 :param keycount: doing distinct keycount is quite slow, so can skip for pre-existing 01060 :return: pdump, chk 01061 """ 01062 log.info("_") 01063 log.info("partition_dumpcheck partition loop %s " % pwhere) 01064 if is_last: 01065 assert pdir[-5:] == "/last" , "unexpected pdir %s for last partition " % (pdir) 01066 01067 ptables = self.ptables() 01068 pdump = None 01069 01070 # DB look at SEQNO ranges 01071 chk = {} 01072 for table in ptables: 01073 chk[table] = self.range(table, pwhere, keycount=keycount) 01074 01075 # file system look at csv files 01076 log.info("checking prior csv dump %s --partitionsize %s --partitionrange %s " % (pdir,self.opts.partitionsize, self.opts.partitionrange)) 01077 csv = self._wc_csv(pdir, keycount=keycount) 01078 if csv is None: 01079 if os.path.exists(pdir): 01080 msg = "Partition directory \"%(pdir)s\" exists but it contains no .csv files, delete the empty directory and rerun to clear this error" % locals() 01081 log.fatal(msg) 01082 raise Exception(msg) 01083 01084 # compare DB expectations to CSV dumps 01085 if csv == chk: 01086 log.info("partition dump %s unchanged wrt DB check " % pdir ) 01087 pdump = False 01088 else: 01089 pdump = True 01090 log.info("chk %s " % repr(chk)) 01091 log.info("csv %s " % repr(csv)) 01092 if is_last: 01093 log.info("last partition dump %s and is changed wrt DB : will redump " % pdir ) 01094 elif csv is None: 01095 log.info("non last partition dump %s csv is None : first dump " % pdir ) 01096 else: 01097 log.fatal("non last partition dump %s and is changed wrt DB : will fail " % pdir ) 01098 self.assert_valid_dump( pdir, csv, chk) 01099 pass 01100 return pdump, chk 01101 01102 01103 def partition_dumplocal___(self, *args, **kwa): 01104 """ 01105 """ 01106 pm = self.partitionmgr 01107 pm.basedir = self.determine_basedir(*args) 01108 if not os.path.exists(pm.basedir): 01109 os.makedirs(pm.basedir) 01110 01111 svy = pm.survey(self) 01112 ptables = self.ptables() 01113 assert len(ptables), ptables 01114 01115 # write all table schema into single schema dir rather than repeating for every partition 01116 pdir = pm.dir("_") 01117 if os.path.exists(pdir): 01118 log.info("schema dir %s exists already" % pdir ) 01119 else: 01120 log.info("creating and populating schema dir %s " % pdir ) 01121 self._dumplocal_mkdir(pdir) 01122 for table in ptables: 01123 self._dumplocal_schema(pdir, table) 01124 pass 01125 pass 01126 if self.opts.archive or self.opts.archiveforce: 01127 self.archive(pdir) 01128 01129 for p in pm.parts: 01130 pdir = pm.dir(p) 01131 pmin = pm.min(p) 01132 pmax = pm.max(p) 01133 pwhere = pm.where(p) 01134 is_last = pm.is_last(p) # last partition expected to be incomplete 01135 keycount = not os.path.exists(pdir) # only keycount for new partitions 01136 pdump, chk = self.partition_dumpcheck( pdir, pwhere , is_last, keycount=keycount ) 01137 if pdump: 01138 log.info("dumplocal partition %s %s %s:%s --partitionsize %s --partitionrange %s " % (p,self.opts.partitionkey,pmin,pmax,self.opts.partitionsize, self.opts.partitionrange)) 01139 self._dumplocal_mkdir(pdir) 01140 for table in ptables: 01141 if is_last: 01142 log.warn("skipping completeness checks for last partition %s into %s " % (p, pdir)) 01143 self._dumplocal_table(pdir, table, pwhere) 01144 elif chk[table]['min'] == pmin and chk[table]['max'] == pmax: 01145 self._dumplocal_table(pdir, table, pwhere) 01146 else: 01147 log.warn(" table %s check min %s pmin %s " % ( table, chk[table]['min'], pmin )) 01148 log.warn(" table %s check max %s pmax %s " % ( table, chk[table]['max'], pmax )) 01149 log.warn("skipping dump as check ahead query indicates incomplete partition %s " % repr(check)) 01150 pass 01151 zdump, zchk = self.partition_dumpcheck( pdir, pwhere, is_last, keycount=True ) 01152 assert zdump == False, ("post dump dumpcheck signals need to dump", pformat(zchk)) 01153 pass 01154 if self.opts.archive: 01155 force = self.opts.archiveforce or pdump 01156 self.archive(pdir, force=force) 01157 pass 01158 01159 def extract(self, dir, base): 01160 """ 01161 :param dir: directory to be created by extraction 01162 :param base: 01163 """ 01164 pass 01165 tgzp, rpath = self.archivepath(dir, base) 01166 log.info("extract dir %s tgzp %s rpath %s " % (dir, tgzp, rpath)) 01167 tgz = Tar(tgzp, toplevelname=rpath ) 01168 tgz.examine() 01169 ls = tgz.list(tgz.members) 01170 tgz.digest(verify=True) 01171 tgz.extract( containerdir=base , toplevelname=rpath, dryrun=False ) 01172 01173 01174 def archivepath(self, dir , base=None): 01175 """ 01176 :param dir: directory to be archived or extracted into 01177 :return: path to archive tarball, dir path relative to base 01178 """ 01179 if base is None: 01180 base = self.backupdir # eg /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7 01181 assert dir[0:len(base)] == base, (dir, base, "dir %s should be within base %s " % (dir,base)) 01182 rpath = dir[len(base)+1:] 01183 elems = rpath.split("/") 01184 name = "_".join(elems) 01185 top = elems[0] # typically this is chunk size 01186 tgzp = os.path.join(base,"archive", top, "%s.tar.gz" % name ) 01187 log.debug("archivepath, dir %s base %s rpath %s name %s tgzp %s " % ( dir, base, rpath, name, tgzp ) ) 01188 return tgzp, rpath 01189 01190 01191 def archive(self, dir , force=False): 01192 """ 01193 :param dir: directory the contents of which should be archived 01194 01195 As a partition corresponds to a certain SEQNO range, it never changes 01196 so there is no need for a datestring in the path. 01197 01198 The configured backupfold needs to be created before using the archive `-a` option with:: 01199 01200 [blyth@belle7 DybPython]$ sudo mkdir /var/dbbackup/dbsrv 01201 [blyth@belle7 DybPython]$ sudo chown -R blyth.blyth /var/dbbackup/dbsrv/ 01202 01203 """ 01204 base = self.backupdir # eg /var/dbbackup/dbsrv/belle7.nuu.edu.tw/channelquality_db_belle7 01205 tgzp, rpath = self.archivepath(dir) 01206 tgzd = os.path.dirname(tgzp) 01207 if not os.path.exists(tgzd): 01208 os.makedirs(tgzd) 01209 pass 01210 01211 if self.opts.transfer: 01212 cfg = self.opts.transfercfg.split(":") 01213 if len(cfg) == 2: 01214 kwa = dict(remotenode=cfg[0], remoteprefix=cfg[1]) 01215 elif len(cfg) == 1: 01216 kwa = dict(remotenode=cfg[0]) 01217 else: 01218 assert 0, "unexpected transfercfg %s " % repr(cfg) 01219 log.debug("using transfercfg : %s " % repr(kwa)) 01220 else: 01221 kwa = {} 01222 01223 tgz = Tar(tgzp, toplevelname=rpath, **kwa) 01224 if os.path.exists(tgzp) and not force: 01225 log.info("archive already exists %s rerun with `-A/--archiveforce` option to recreate " % tgzp ) 01226 else: 01227 log.info("creating archive %s for %s " % (tgzp,rpath) ) 01228 tgz.archive( base ) # argument specifies the root directory of the archive 01229 pass 01230 tgz.examine() 01231 #log.info("\n".join(tgz.names)) 01232 ls = tgz.list(tgz.members) 01233 log.debug("\n"+ls) 01234 du = os.popen("du -hs %(tgzp)s" % locals()).read().strip() 01235 log.info(du) 01236 tgz.digest() 01237 01238 if self.opts.transfer: 01239 tgz.transfer() 01240 01241 01242 def partition_loadlocal___(self, *args, **kwa ): 01243 """ 01244 #. look into putting the partitions back togther again, in partitioned load local 01245 #. read file system tealeaves wrt the partitioning 01246 01247 #. factor off the checking 01248 #. need to work out which partitions are new and just load those 01249 """ 01250 basedir = self.determine_basedir(*args) 01251 log.info("basedir %s " % basedir ) 01252 pm = self.partitionmgr 01253 pm.basedir = basedir 01254 01255 if self.opts.extract: 01256 chunks = pm.archived_chunks() 01257 log.info("archived_chunks %s " % repr(chunks) ) 01258 pm.assign_parts( chunks ) 01259 else: 01260 chunks = pm.available_chunks() # only works after extraction 01261 log.info("available_chunks %s " % repr(chunks) ) 01262 01263 _dir = pm.dir("_") 01264 if not os.path.exists(_dir): 01265 log.info("schema dir %s does not exists" % _dir ) 01266 if self.opts.extract: 01267 self.extract(_dir, basedir) 01268 pass 01269 pass 01270 assert os.path.exists(_dir), _dir 01271 #_tables = map(lambda _:_[:-7],filter(lambda _:_[-7:] == '.schema',os.listdir(_dir))) 01272 01273 for p in pm.parts: 01274 pdir = pm.dir(p) 01275 pnam = os.path.basename(pdir) 01276 pmin = pm.min(p) 01277 pmax = pm.max(p) 01278 pwhere = pm.where(p) 01279 #log.debug("p %s pdir %s pmin %s pmax %s pwhere %s " % (p,pdir,pmin,pmax,pwhere)) 01280 if not os.path.exists(pdir): 01281 log.debug("partition_loadlocal___ partition dir %s does not exist " % pdir ) 01282 if self.opts.extract: 01283 self.extract(pdir, basedir) 01284 01285 if not os.path.exists(pdir): 01286 log.debug("partition_loadlocal___ partition dir %s STILL does not exist " % pdir ) 01287 else: 01288 log.info("partition_loadlocal___ loading %s " % pdir ) 01289 ltables = self.loadlocal_dir( pdir ) 01290 log.debug("ltables %s " % str(ltables)) 01291 01292 # check that the loaded partition yields the expected key range and count 01293 if not self.opts.nocheck: 01294 check = {} 01295 for table in ltables: 01296 check[table] = self.minmax(table, pwhere) 01297 assert check[table]['min'] == pmin, (table,"min",check[table]['min'],pmin) 01298 if pnam != 'last': 01299 assert check[table]['max'] == pmax, (table,"max",check[table]['max'],pmax) 01300 # keycount check is too slow 01301 #assert check[table]['keycount'] == pm.size, (table,"keycount",check[table]['keycount'],pm.size) 01302 pass 01303 log.debug(pformat(check)) 01304 pass 01305 01306 01307 def loadlocal_dir(self, dir ): 01308 """ 01309 """ 01310 # if a sibling dir named "_" exists use that as a source of schema files, otherwise get directly 01311 _dir = os.path.join(os.path.dirname(dir),"_") 01312 if not os.path.exists(_dir): 01313 _dir = dir 01314 01315 def _replace_ignore(table): 01316 """ 01317 hmm DBI specificity slipping in 01318 """ 01319 if table == "LOCALSEQNO": 01320 return "REPLACE" 01321 else: 01322 return "IGNORE" 01323 01324 utables = self.utables 01325 01326 log.info("utables %s " % repr(utables)) 01327 01328 ltables = [] 01329 for name in filter(lambda _:os.path.isfile(os.path.join(dir,_)) and _[-4:] == '.csv' , os.listdir(dir)): 01330 path = os.path.join(dir, name) 01331 table, ext = os.path.splitext(name) 01332 if not table in utables: 01333 schema = os.path.join(_dir, "%s.schema" % table ) 01334 log.warn("creating table %s from schema file %s " % (table, schema)) 01335 assert os.path.exists(schema), schema 01336 self(open(schema,"r").read()) 01337 pass 01338 ltables.append(table) 01339 01340 ttable, csvminkey, csvmaxkey = self.tabminmax_csv(path) 01341 assert csvminkey < csvmaxkey , (csvminkey,csvmaxkey) 01342 assert ttable == table, (ttable, table) 01343 mm = self.minmax(table) 01344 log.debug("csvminkey %s csvmaxkey %s " % (csvminkey, csvmaxkey)) 01345 log.debug(" mmmin %(min)s mmmax %(max)s " % mm ) 01346 if csvmaxkey <= mm['max']: 01347 log.info("SKIP: as already loaded csv keys minmax %s %s from %s " % (csvminkey,csvmaxkey,path )) 01348 else: 01349 ll = LoadDataLocalInfile(infile=path, table=table, ignorelines=0, replace_ignore=_replace_ignore(table) ) 01350 self(str(ll)) 01351 log.info("loadlocal ingesting %s took %s seconds " % (path, "%5.2f" % self.lastseconds ) ) 01352 pass 01353 return ltables 01354 01355 01356 def loadlocal___(self, *args, **kwa ): 01357 """ 01358 :param outdir: specifies directory containing normal or partitioned dump of CSV files 01359 """ 01360 odir = self.timestamped_dir(*args) 01361 return self.loadlocal_dir(odir) 01362 01363 01364 01365 class tdict(dict): 01366 __str__ = lambda self:self.tmpl % self 01367 01368 class IntoOutfile(tdict): 01369 tmpl = "select * from %(table)s where %(where)s into outfile '%(outfile)s' fields terminated by ',' optionally enclosed by '\"' " 01370 01371 class RemoteIntoOutfile(tdict): 01372 tmpl = "select * from %(table)s where %(where)s " 01373 01374 class LoadDataLocalInfile(tdict): 01375 tmpl = "LOAD DATA LOCAL INFILE '%(infile)s' %(replace_ignore)s INTO TABLE %(table)s FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' IGNORE %(ignorelines)s LINES " 01376 01377 01378 01379 class PartitionMgr(object): 01380 int_ptn=re.compile("^\d*$") 01381 def __init__(self, opts, basedir=None): 01382 """ 01383 #. for example 1000,0,3 for three chunks of 1000 starting at 1 01384 """ 01385 self.basedir = basedir 01386 pass 01387 size = int(opts.partitionsize) 01388 pass 01389 if opts.partitionrange is None: 01390 parts = None 01391 else: 01392 a, n = map(int,opts.partitionrange.split(",")) 01393 parts = range(a,n) 01394 pass 01395 key, kpo = opts.partitionkey.split(",") 01396 and_where = opts.where 01397 pass 01398 self.and_where = and_where 01399 01400 if opts.partitionlast is None: 01401 plast = None 01402 else: 01403 plast = int(opts.partitionlast) 01404 pass 01405 self.plast = plast 01406 pass 01407 self.key = key # key string eg SEQNO 01408 self.kpo = int(kpo) # key position in csv 01409 pass 01410 self.size = size 01411 self.parts = parts 01412 01413 01414 def survey(self, db ): 01415 """ 01416 Need to identify the last partition after which not 01417 all of the tables will have full partitions. 01418 01419 For sensible partitioning should restrict to tables 01420 whose SEQNO stride along roughly together. 01421 01422 """ 01423 size = self.size 01424 log.info("PartitionMgr survey size %s " % size ) 01425 svy = {} 01426 mlp = 0 01427 01428 for table in db.ptables(): 01429 mm = db(self.qminmax(table,where="TRUE"))[0] 01430 last = mm['max']/size 01431 svy[table] = dict(mm, last=last ) 01432 pass 01433 svy['COMMON'] = dict( 01434 min=max(map(lambda _:_['min'],svy.values())), 01435 max=min(map(lambda _:_['max'],svy.values())), 01436 last=min(map(lambda _:_['last'],svy.values())) 01437 ) 01438 01439 log.info("\n"+pformat(svy)) 01440 autoplast = svy['COMMON']['last'] 01441 log.info("auto survey determines last partition index as %s " % autoplast ) 01442 if self.plast is None: 01443 self.plast = autoplast 01444 else: 01445 if self.plast != autoplast: 01446 log.warn("`--partitionlast` option gives %s BUT auto-survey gives %s " % (self.plast, autoplast)) 01447 pass 01448 pass 01449 if self.parts is None: 01450 parts = range(0,self.plast+1) # plast is the 0-based index of the last partition, hence the need for +1 01451 log.info("auto definition of partition range %s " % repr(parts)) 01452 self.parts = parts 01453 else: 01454 log.info("using partition range from options %s " % repr(self.parts)) 01455 return svy 01456 01457 def assign_parts( self, chunks ): 01458 """ 01459 :param chunks: 01460 """ 01461 parts = sorted(chunks) 01462 plast = parts[-1] + 1 01463 parts += [plast] 01464 01465 self.parts = parts 01466 self.plast = plast 01467 log.info("assign_parts %s plast %s " % ( self.parts, self.plast )) 01468 01469 def dir(self, p): 01470 """ 01471 """ 01472 if self.is_last(p): 01473 leaf = "last" 01474 else: 01475 leaf = p 01476 pass 01477 return os.path.join(self.basedir,str(self.size),str(leaf)) 01478 01479 def is_last(self, p): 01480 return p == self.plast 01481 01482 def qminmax(self, table, where="1=1"): 01483 key = self.key 01484 return "select max(%(key)s) as max, min(%(key)s) as min, count(*) as count from %(table)s where %(where)s " % locals() 01485 01486 def qrange(self, table, where="1=1", keycount=True): 01487 key = self.key 01488 if keycount: 01489 keycount = ", count(distinct(%(key)s)) as keycount " % locals() 01490 else: 01491 keycount = "" 01492 pass 01493 return "select max(%(key)s) as max, min(%(key)s) as min, count(*) as count %(keycount)s from %(table)s where %(where)s " % locals() 01494 01495 def offset(self, p): 01496 return self.size*p 01497 def min(self, p): 01498 return self.size*p + 1 01499 def max(self, p): 01500 return self.size*(p+1) 01501 def where(self, p): 01502 n = len(self.parts) 01503 size = self.size 01504 pmin = self.min(p) 01505 pmax = self.max(p) 01506 pkey = self.key 01507 p1 = p + 1 01508 if self.and_where is None: 01509 and_where = "" 01510 else: 01511 and_where = " and %s " % self.and_where 01512 01513 return "/* [%(p)s] %(size)s-partition %(p1)-3s/%(n)s */ %(pkey)s >= %(pmin)s and %(pkey)s <= %(pmax)s %(and_where)s " % locals() 01514 01515 def int_dirname(self, dir): 01516 """ 01517 :param dir: 01518 :return: list of integers corresponding to subfolder of *dir* with integer names, in ascending order 01519 """ 01520 return sorted(map(int,filter(lambda _:self.int_ptn.match(_), os.listdir(dir)))) 01521 01522 def has_partitionset(self): 01523 """ 01524 Checks if there is a partitionset folder for the configured partition size. 01525 ie looks for the 10000 folder, in default case. 01526 """ 01527 psizes = self.int_dirname(self.basedir) 01528 if not self.size in psizes: 01529 msg = "partition dump at %s does not have partitions of the configured size %s : %s " % (self.basedir, self.size, psizes) 01530 log.warn(msg) 01531 return False 01532 return True 01533 01534 def archived_chunks(self): 01535 """ 01536 :return: list of integers in ascending order corresponding to partition archives found 01537 """ 01538 zdir = os.path.join(self.basedir, "archive", str(self.size)) 01539 ptn = re.compile("^(?P<size>\d*)_(?P<part>\d*).tar.gz.dna$") 01540 chunks = [] 01541 for path in os.listdir(zdir): 01542 name = os.path.basename(path) 01543 match = ptn.match(name) 01544 if match: 01545 d = match.groupdict() 01546 chunks.append(int(d['part'])) 01547 pass 01548 return sorted(chunks) 01549 01550 def available_chunks(self): 01551 """ 01552 :return: list of intergers in ascending order corresponding to the partition folders found 01553 01554 Looks for the partition folder, named according to the partition size eg "basedir/10000" 01555 and determines the chunks by looking at the integer named partition subfolders "basedir/10000/0" 01556 """ 01557 if not self.has_partitionset(): 01558 log.warn("no partitionset for partition size %s" % self.size) 01559 return [] 01560 pfold = os.path.join( self.basedir, str(self.size) ) 01561 chunks = self.int_dirname(pfold) 01562 log.info("partition folder %s for configured size %s has chunks %s " % (pfold, self.size, repr(chunks)) ) 01563 return chunks 01564 01565 01566 class MyCnf(dict): 01567 def __init__(self, path = "~/.my.cnf", prime={}): 01568 """ 01569 :param path: to config file 01570 :param prime: initial dict to prime the config, eg for current date 01571 """ 01572 cfp = ConfigParser(prime) 01573 paths = cfp.read( [os.path.expandvars(os.path.expanduser(p)) for p in path.split(":")] ) 01574 log.debug("MyCnf read %s " % repr(paths) ) 01575 self.cfp = cfp 01576 self.sections = cfp.sections() 01577 self.path = path 01578 self.paths = paths 01579 01580 def section(self, sect, home=None): 01581 """ 01582 :param sect: name of section in config file 01583 :return: dict of config parameters from specified section of config file 01584 01585 When the section is present in the config file simply return 01586 the parameters. If the section is not present in the file then 01587 adopt the \"home\" instanance section and swap in the the database 01588 from the sect name. 01589 01590 This allows access to all databases on a server without having 01591 corresponding config file sections for all of them. 01592 """ 01593 if sect in self.sections: 01594 it = dict(self.cfp.items(sect)) 01595 else: 01596 if not home is None: 01597 assert home.sect in self.sections, self.sections 01598 it = dict(self.cfp.items(home.sect)) 01599 it['database'] = sect 01600 log.debug("no section %s infer config from home.sect %s section assuming section name = dbname " % (sect, home.sect) ) 01601 else: 01602 it = None 01603 pass 01604 return it 01605 01606 def mysqldb_pars(self, sect, home=None): 01607 """ 01608 :param sect: name of section in config file 01609 :param home: DB instance 01610 :return: dict of mysql-python style connection parameters 01611 01612 Annoyingly mysql-python needs these keys 01613 01614 `host` host to connect 01615 `user` user to connect as 01616 `passwd` password to use 01617 `db` database to use 01618 01619 whereas mysql uses slightly different ones 01620 01621 `host` 01622 `user` 01623 `password` 01624 `database` 01625 01626 Normally can avoid this annoyance using:: 01627 01628 conn = MySQLdb.connect( read_default_group=sect ) 01629 01630 but when need to impinge `database/db` settings this is not possible. 01631 """ 01632 my2mp = dict(host="host",user="user",password="passwd",database="db", socket="unix_socket") 01633 my = self.section(sect, home=home) 01634 01635 if my is None: 01636 msg = "missing required section \"%s\" in config file \"%s\" " % ( sect, self.path ) 01637 log.fatal(msg) 01638 raise Exception(msg) 01639 01640 mp = {} 01641 for k in filter(lambda k:k in my2mp,my.keys()): # key translation, mysql to mysql-python 01642 mp[my2mp[k]] = my[k] 01643 log.debug("translate mysql config %s into mysql-python config %s " % ( dict(my,password="***") , dict(mp,passwd="***") )) 01644 return mp 01645 01646 01647 01648 01649 01650 def main(): 01651 """ 01652 Need to support logging on py2.3 01653 01654 http://docs.python.org/release/2.3.5/lib/module-logging.html 01655 """ 01656 from optparse import OptionParser 01657 op = OptionParser(usage=__doc__ + "\n" + DB.docs() ) 01658 now = datetime.now().strftime("%Y%m%d_%H%M") 01659 01660 tables = "DqChannel,DqChannelVld,DqChannelStatus,DqChannelStatusVld" # default tables to backup/propagate 01661 01662 op.add_option("-l", "--loglevel", default="INFO", help="Choose logging level case insensitively eg INFO,DEBUG,WARN. Default %default " ) 01663 op.add_option( "--logformat", default="%(asctime)s %(name)s %(levelname)-8s %(message)s" , help="Logging format. Default %default " ) 01664 op.add_option("-t", "--tables", default=tables, help="Comma delimited list of table names to be included in operations. Default %default " ) 01665 op.add_option( "--DB_DROP_CREATE", action="store_true", help="CAUTION: DROPs and CREATES the database specified in the first argument. Default %default " ) 01666 op.add_option("-C", "--noconfirm", action="store_true", help="Skip interactive confirmation of dangerous operations. Default %default " ) 01667 op.add_option("-K", "--nocheck", action="store_true", help="Skip some slow checking. Default %default " ) 01668 op.add_option("-w", "--where", default=None, help="Where clause applied to selects on all used tables. Default %default " ) 01669 op.add_option( "--partition", action="store_true", help="Use partitioned dumping OR loading, used as an optimization for dealing with very large tables. Default %default " ) 01670 op.add_option( "--partitionrange", default=None, help="Partition range in partition indices corresponding to python `range(a,b)` or None for all. Default %default " ) 01671 op.add_option( "--partitionsize", default="10000", help="Partition size. Default %default " ) 01672 op.add_option( "--partitionkey", default="SEQNO,0", help="Fieldname, field position in CSV on which partition chunking is based. Default %default " ) 01673 op.add_option( "--partitionlast", default=None, help="(DEPRECATED now determined automatically) Used to identify the index of the last incomplete partition, in order to skip completeness checks and save into a \"last\" directory. Default %default " ) 01674 op.add_option( "--timestamp", default=now, help="Non-partitioned dumps and archives are placed inside date stamped folders, to use a prior one the stamp must be specifed. Default %default " ) 01675 op.add_option( "-A","--archiveforce", action="store_true", help="Proceed with archiving even if a preexisting archive exists. Default %default " ) 01676 op.add_option( "-a","--archive", action="store_true", help="Create archives of the backups ready for offbox transfer. Default %default " ) 01677 op.add_option( "-x","--extract", action="store_true", help="Extracts backup folders from archives. Default %default " ) 01678 op.add_option( "--backupfold", default="/var/dbbackup/dbsrv", help="Folder under which backups and archives are kept. Default %default " ) 01679 op.add_option( "-T","--transfer", action="store_true", help="Must be used together `--archive` option to transfer tarballs to the remote node configured with `--transfercfg`. Default %default " ) 01680 op.add_option( "--transfercfg", default=os.environ.get('DBSRV_REMOTE_NODE','S:/data'), help="Configure archive transfers with the ssh node name and destination prefix eg \"S:/data\" that comes ahead of the backupfold. Default %default " ) 01681 op.add_option( "-s","--home", default="loopback", help="Name of .my.cnf section of information_schema DB on server regarded as home. Should normally be \"loopback\". Default %default " ) 01682 opts, args = op.parse_args() 01683 level = getattr(logging, opts.loglevel.upper()) 01684 try: 01685 logging.basicConfig(format=opts.logformat,level=level) 01686 except TypeError: 01687 hdlr = logging.StreamHandler() 01688 formatter = logging.Formatter(opts.logformat) 01689 hdlr.setFormatter(formatter) 01690 log.addHandler(hdlr) 01691 log.setLevel(level) 01692 pass 01693 assert len(args)>1, "need 2 or more arguments %s " % repr(args) 01694 01695 log.info("sys.version_info %s " % repr(sys.version_info)) 01696 log.info("got %s args : %s " % (len(args), repr(args))) 01697 01698 if opts.transfer: 01699 sshenv() # pick up ssh environment for use from spartan environments such as the cron commandline 01700 01701 home = DB(opts.home, opts=opts) 01702 01703 arg = args[0] 01704 if arg.find(",") > -1: 01705 log.debug("comma delimited argument %s " % arg ) 01706 names = arg.split(",") 01707 elif arg.find("\\") > -1 or arg.find("\*") > -1: 01708 log.debug("regexp argument %s " % arg ) 01709 ptn = re.compile(arg) 01710 names = filter(lambda dbname:ptn.match(dbname), home.databases) 01711 else: 01712 log.debug("plain single DB argument %s " % arg ) 01713 names = [arg] 01714 pass 01715 log.debug("arg %s names %s " % ( arg, repr(names))) 01716 for name in names: 01717 if opts.DB_DROP_CREATE: 01718 home.database_drop_create(name) 01719 pass 01720 db = DB(name, opts=opts, home=home) 01721 db.dispatch( *args[1:] ) 01722 01723 if __name__=='__main__': 01724 db = main() 01725