/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

Public Member Functions | Public Attributes
job::runPQM Class Reference

List of all members.

Public Member Functions

def __init__
def mkThreads
def mkThrPath
def mkAdinfoFile
def mkHistoryPath
def mkHistoPath
def mkLinkLog
def mkLogPath
def mkPngPath
def mkAvaRunNum
def getRunFileAll
def getfile
def getfiletime
def runEndOrNot
def listfile
def runNumberSort
def transferFile
def setAvaRun
def setFileName
def setRunInfo
def getsite
def updateRealtimeLink
def movePng
def setAvaPlots
def writeLog
def readMissSeq
def missSeqSort
def writeMissing
def delJob
def checkexit
def getFileDir
def checkFile
def addSeqNum
def updateMaxSeq
def subJob
def checkSkip
def writeJob
def readSeqList
def writeCurSeq
def seqNumCount
def run

Public Attributes

 debug
 NUMOFJOB
 seqnum_start
 seqnum
 maxseq
 waitnum
 lastSeqNum
 missfile
 missSeqList
 missSeqMap
 seqFileDirMap
 jobflag
 filepath
 ISOTIMEFORMAT
 ISOTIMEFORMAT1
 pngpath
 logpath
 logfile
 histopath
 historypath
 adinfofile
 threadspath
 THREADS
 printflag

Detailed Description

Definition at line 41 of file job.py.


Constructor & Destructor Documentation

def job::runPQM::__init__ (   self)

Definition at line 42 of file job.py.

00043                       :
00044         #self.debug = True 
00045         self.debug = False
00046         self.NUMOFJOB = 32
00047         if self.debug:
00048             self.NUMOFJOB = 7
00049         self.seqnum_start = getSeqStart()
00050         self.seqnum = self.seqnum_start
00051         self.maxseq = getmaxseq()
00052         self.waitnum = 0
00053         self.lastSeqNum = self.seqnum
00054         self.missfile = 'missingfiles'
00055         self.missSeqList = []
00056         self.missSeqMap = {}
00057         # This map is to store the file info of missing files
00058         self.seqFileDirMap = {}
00059 
00060         # Define a flag to indicate to which job queue jobs will be submitted
00061         self.jobflag = 0
00062 
00063         # Raw data path
00064         self.filepath = '/dyb/rawdata_user/spade/'
00065         #self.filepath = '/dybdata/rawdata_user/spade/'
00066 
00067         # Set time format
00068         self.ISOTIMEFORMAT='%Y-%m-%d %X'
00069         self.ISOTIMEFORMAT1='%Y%m%d%H%M%S'
00070 
00071         # Folder for png files
00072         #self.pngpath = '../HistLog/'
00073         self.pngpath = '/dyb/pqmdata/HistLog/'
00074 
00075         # log file for this script
00076         self.logpath = self.pngpath + 'log/'
00077         self.logfile = self.logpath + 'log' + time.strftime(self.ISOTIMEFORMAT1,time.localtime())
00078 
00079         # Folder for histogram ROOT files
00080         self.histopath = self.pngpath + 'histogram/'
00081 
00082         # Folder for history information
00083         self.historypath = self.pngpath + 'testdb/'
00084         self.adinfofile = self.historypath + 'ADInfo.txt'
00085 
00086         # Folder for multijobs
00087         self.threadspath = './Threads/'
00088 
00089         # File to store the sequence numbers
00090         self.THREADS = './threads'
00091 
00092         self.printflag = True


Member Function Documentation

def job::runPQM::mkThreads (   self)

Definition at line 93 of file job.py.

00094                        :
00095         if not os.path.exists(self.THREADS):
00096             os.system('touch threads')
00097         else:
00098             os.system('rm -rf ' + self.THREADS)
00099             os.system('touch threads')

def job::runPQM::mkThrPath (   self)

Definition at line 100 of file job.py.

00101                        :
00102         if not os.path.exists( self.threadspath ):
00103             os.mkdir( self.threadspath )
00104         else:
00105             pass

def job::runPQM::mkAdinfoFile (   self)

Definition at line 106 of file job.py.

00107                           :
00108         if not os.path.exists(self.adinfofile):
00109             os.system('touch ' + self.adinfofile)
00110         else:
00111             pass

def job::runPQM::mkHistoryPath (   self)

Definition at line 112 of file job.py.

00113                            :
00114         if not os.path.exists(self.historypath):
00115             os.mkdir(self.historypath)
00116         else:
00117             pass

def job::runPQM::mkHistoPath (   self)

Definition at line 118 of file job.py.

00119                          :
00120         if not os.path.exists(self.histopath):
00121             os.mkdir(self.histopath)
00122         else:
00123             pass

def job::runPQM::mkLinkLog (   self)

Definition at line 124 of file job.py.

00125                        :
00126         os.system('ln -sf ' + self.logfile + ' log-recent\n')

def job::runPQM::mkLogPath (   self)

Definition at line 127 of file job.py.

00128                        :
00129         if not os.path.exists(self.logpath):
00130             os.mkdir(self.logpath)
00131         else:
00132             pass

def job::runPQM::mkPngPath (   self)

Definition at line 133 of file job.py.

00134                        :
00135         if not os.path.exists(self.pngpath):
00136             os.mkdir(self.pngpath)
00137         else:
00138             pass

def job::runPQM::mkAvaRunNum (   self)

Definition at line 139 of file job.py.

00140                          :
00141         if not os.path.exists(self.pngpath + 'available_run_number'):
00142             os.system('touch ' + self.pngpath + 'available_run_number')
00143         else:
00144             pass

def job::runPQM::getRunFileAll (   self,
  runNo 
)

Definition at line 146 of file job.py.

00147                                   :
00148         self.checkexit()
00149 
00150         try:
00151             connect = MySQLdb.connect(host=dbhost, user=dbuser, passwd=dbpasswd, db=database)
00152             cursor = connect.cursor()
00153             cursor.execute( " SELECT seqno, runno, stream FROM DaqRawDataFileInfo where runno = " + str(runNo) + " ORDER BY seqno DESC" )
00154             row = cursor.fetchall()
00155         except MySQLdb.Error, e:
00156             print "Error %d: %s" % (e.args[0], e.args[1])
00157             sys.exit (1)
00158             cursor.close()
00159             connect.close()
00160     
00161         return row
    
def job::runPQM::getfile (   self,
  seqno 
)

Definition at line 163 of file job.py.

00164                             :
00165         self.checkexit()
00166 
00167         try:
00168             connect = MySQLdb.connect(host=dbhost, user=dbuser, passwd=dbpasswd, db=database)
00169             cursor = connect.cursor()
00170             cursor.execute(" SELECT seqno, filename, runno, streamtype, stream FROM DaqRawDataFileInfo where seqno = " + str(seqno))
00171             row = cursor.fetchone()
00172         except MySQLdb.Error, e:
00173             print "Error %d: %s" % (e.args[0], e.args[1])
00174             sys.exit (1)
00175         cursor.close()
00176         connect.close()
00177         
00178         return row
    
def job::runPQM::getfiletime (   self,
  seqno 
)

Definition at line 179 of file job.py.

00180                                 :
00181         self.checkexit()
00182 
00183         try:
00184             connect = MySQLdb.connect(host=dbhost, user=dbuser, passwd=dbpasswd, db=database)
00185             cursor = connect.cursor()
00186             cursor.execute(" SELECT timestart FROM DaqRawDataFileInfoVld where seqno = " + str(seqno))
00187             row = cursor.fetchone()
00188         except MySQLdb.Error, e:
00189             print "Error %d: %s" % (e.args[0], e.args[1])
00190             sys.exit (1)
00191         cursor.close()
00192         connect.close()
00193     
00194         return row[0]
    
def job::runPQM::runEndOrNot (   self,
  runNo 
)

Definition at line 196 of file job.py.

00197                                 :
00198         self.checkexit()
00199 
00200         try:
00201             connect = MySQLdb.connect(host=dbhost, user=dbuser, passwd=dbpasswd, db=database)
00202             cursor = connect.cursor()
00203             cursor.execute( " SELECT runno FROM DaqRunInfo where runno = " + str(runNo) )
00204             row = cursor.fetchone()
00205         except MySQLdb.Error, e:
00206             print "Error %d: %s" % (e.args[0], e.args[1])
00207             sys.exit (1)
00208         cursor.close()
00209         connect.close()
00210     
00211         if row != None:
00212             return 'YES'
00213         else:
00214             return 'NO'
    
def job::runPQM::listfile (   self,
  path,
  runno,
  seqno,
  f 
)

Definition at line 216 of file job.py.

00217                                              :
00218         self.checkexit()
00219 
00220         for i in os.listdir(path):
00221             newpath = path + '/' + i
00222             if os.path.isfile(newpath):
00223                 if i.find('small') < 0 and i.find('png') > -1:
00224                     spath = newpath.split('/')
00225                     predir = ['HistLog/run' + str(runno)]
00226                     index = spath.index('stats' + str(seqno))
00227                     spath = spath[(index + 1):]
00228                     spath = predir + spath
00229                     newpath = '/'.join(spath)
00230                     plot0 = spath[4].split('.')
00231                     plot1 = plot0[0].split('_')
00232                     f.write(plot1[2] + '\t' + newpath + '\n')
00233             if os.path.isdir(newpath):
00234                 self.listfile(newpath, runno, seqno, f)
    
def job::runPQM::runNumberSort (   self,
  fileName 
)

Definition at line 236 of file job.py.

00237                                      :
00238         self.checkexit()
00239 
00240         runInfo = {}
00241         f = open(fileName, 'r')
00242         runList = []
00243         for i in f:
00244             a = i.split(' ')
00245             runList.append( int(a[0]) )
00246             runInfo[a[0]] = i
00247         f.close()
00248     
00249         runList = set(runList)
00250         runList = list(runList)
00251         runList.sort()
00252     
00253         f = open(fileName, 'w')
00254         for i in range( len(runList) ):
00255             f.write(runInfo[str(runList[-(i + 1)])]) 
00256         f.close()

def job::runPQM::transferFile (   self,
  runnum,
  seqnum 
)

Definition at line 257 of file job.py.

00258                                           :
00259         self.checkexit()
00260 
00261         self.writeLog('Transfer ROOT file of run:' + str(runnum))
00262         runpath = self.threadspath + 'run' + str(runnum) + '/'
00263         os.system('mv ' + runpath + 'sum_' + str(runnum) + '.root '+ self.histopath + 'hist_' + str(runnum) + '.root')
00264         os.system('rm -rf ' + runpath)

def job::runPQM::setAvaRun (   self,
  fileinfo 
)

Definition at line 266 of file job.py.

00267                                  :
00268         self.checkexit()
00269 
00270         runnum = fileinfo[2]
00271         streamtype = fileinfo[3]
00272         stream = fileinfo[4]
00273         farn = open('temp', 'w')
00274         farn.write(str(runnum) + " " + streamtype + " " + stream + '\n')
00275         farn.close()
00276         os.system('cat ' + self.pngpath + 'available_run_number >> temp')
00277         os.system('cp temp ' + self.pngpath + 'available_run_number')
00278         self.runNumberSort(self.pngpath + 'available_run_number')
    
def job::runPQM::setFileName (   self,
  site,
  filename,
  runnum,
  seq 
)

Definition at line 280 of file job.py.

00281                                                       :
00282         self.checkexit()
00283 
00284         #self.writeLog("set new file info:")
00285         #self.writeLog(site)
00286         namelist = filename.split('.')
00287         fileindex = namelist[6]
00288         tmpsite = site
00289         content = []
00290         if os.path.exists(self.pngpath + 'current_file_name_tmp'):
00291             f = open(self.pngpath + 'current_file_name_tmp')
00292             content = f.readlines()
00293             #self.writeLog(content)##
00294             f.close()
00295         f = open(self.pngpath + 'current_file_name_tmp', 'w')
00296         for cont in content:
00297             pos = cont.find(':')
00298             if pos != -1:
00299                 ss = cont[0:pos]
00300                 if ss in site:
00301                     fileinfo = ss + ': Run' + str(runnum) + ' (subfile_' + fileindex + ') ' \
00302                             + str(self.getfiletime(seq)) + ' (UTC)'
00303                     f.write(fileinfo + '\n')
00304                     #self.writeLog('&&' + fileinfo)
00305                     tmpsite.remove(ss)
00306                 else:
00307                     f.write(cont)
00308                     #self.writeLog('**' + cont)
00309         #self.writeLog('tmpsite:')
00310         #self.writeLog(tmpsite)
00311         for ss in tmpsite:
00312             fileinfo = ss + ': Run' + str(runnum) + ' (subfile_' + fileindex + ') ' \
00313                     + str(self.getfiletime(seq)) + ' (UTC)'
00314             f.write(fileinfo + '\n')
00315             #self.writeLog(fileinfo)
00316 
00317         strfile = 'file' + fileindex + ' ' + str(self.getfiletime(seq)) + ' (UTC)'
00318         fstrf = open(self.pngpath + 'current_file_name', 'w')
00319         fstrf.write(strfile)
00320         fstrf.close()
00321         return
    
def job::runPQM::setRunInfo (   self,
  datpath,
  runnum 
)

Definition at line 323 of file job.py.

00324                                          :
00325         self.checkexit()
00326 
00327         if os.path.exists(datpath):
00328             fin = open(datpath, 'r')
00329             strstat = fin.read()
00330             fadin = open(self.adinfofile, 'aw')
00331             fadin.write(str(runnum) + '#' + strstat)
00332             newpath = self.pngpath + 'run' + str(runnum)
00333             os.system('mv ' + datpath + ' ' + newpath)

def job::runPQM::getsite (   self,
  statspath 
)
Get the site names in 'stats' directory 

Definition at line 334 of file job.py.

00335                                 :
00336         ''' Get the site names in 'stats' directory '''
00337 
00338         listdir = os.listdir(statspath)
00339         site = []
00340         for item in listdir:
00341             if os.path.isdir(statspath + '/' + item + '/'):
00342                 site.append(item)
00343         return site

def job::runPQM::updateRealtimeLink (   self,
  site,
  runnum 
)
Update the real time link 

Definition at line 344 of file job.py.

00345                                               :
00346         ''' Update the real time link '''
00347 
00348         runpath = self.pngpath + 'run' + str(runnum)
00349         for s in site:
00350             realtimeLink = self.pngpath + 'runRealtime/' + s
00351             if os.path.exists(realtimeLink):
00352                 os.remove(realtimeLink)
00353             os.system("ln -sf ../run" + str(runnum) + "/" + s + "/ " + realtimeLink)
00354         return
00355         #listdir = os.listdir(runpath)
00356         #for item in listdir:
00357         #    if os.path.isdir(runpath + '/' + item + '/'):
00358         #        realtimeLink = self.pngpath + 'runRealtime/' + item
00359         #        if os.path.exists(realtimeLink):
00360         #            os.remove(realtimeLink)
00361         #        os.system("ln -sf ../run" + str(runnum) + "/" + item + "/ " + realtimeLink)
00362         #        break
00363         #return
    
def job::runPQM::movePng (   self,
  runpath,
  runnum,
  seqno 
)

Definition at line 365 of file job.py.

00366                                              :
00367         self.checkexit()
00368 
00369         statspath = runpath + 'stats' + str(seqno)
00370         newpath = self.pngpath + 'run' + str(runnum)
00371         if not os.path.exists(newpath):
00372             os.system('mkdir ' + newpath)
00373         elif not os.path.isdir(newpath):
00374             os.system('rm -rf ' + newpath)
00375             os.system('mkdir ' + newpath)
00376         if os.path.exists(statspath):
00377             os.system('cp -r ' + statspath + '/* ' + newpath)
00378             os.system('rm -rf ' + statspath)
00379         ##Set current run number
00380         #fcrn = open(self.pngpath + 'current_run_number', 'w')
00381         #fcrn.write(str(runnum))
00382         #fcrn.close()
    
def job::runPQM::setAvaPlots (   self,
  statspath,
  runnum,
  seqno 
)

Definition at line 384 of file job.py.

00385                                                    :
00386         self.checkexit()
00387 
00388         fplot = open(statspath + '/available_plots.txt', 'w')
00389         self.listfile(statspath, runnum, seqno, fplot)
00390         fplot.close()

def job::runPQM::writeLog (   self,
  para 
)

Definition at line 391 of file job.py.

00392                             :
00393         if not isinstance(para, list):
00394             if para.find("prepare") != -1 and self.printflag == False: return
00395         if not isinstance(para, list):
00396             if para.find("prepare") != -1:
00397                 self.printflag = False 
00398             else:
00399                 self.printflag = True
00400         else:
00401             self.printflag = True
00402         f = open(self.logfile, 'aw')
00403         if isinstance(para, list):
00404             for i in para:
00405                 f.write(i + '\t')
00406             f.write('\n')
00407         elif isinstance(para, str):
00408             f.write(para + '\n')
00409         f.close()

def job::runPQM::readMissSeq (   self)

Definition at line 411 of file job.py.

00412                          :
00413         f = open(self.missfile, 'r')
00414         self.missSeqList = []
00415         self.missSeqMap = {}
00416         for i in f:
00417             a = i.split("\n")
00418             index = a[0].index(" ")
00419             seqstamp = int(a[0][index + 1:])
00420             #seqstamp = int(a[0][:index])
00421             self.missSeqList.append( seqstamp )
00422             self.missSeqMap[seqstamp] = a[0][:index]
00423             #seqMap[seqstamp] = a[0][index + 1:]
00424         f.close()
00425 
00426         self.missSeqList = set(self.missSeqList)
00427         self.missSeqList = list(self.missSeqList)
00428         self.missSeqList.sort()

def job::runPQM::missSeqSort (   self)

Definition at line 429 of file job.py.

00430                          :
00431         self.readMissSeq()
00432 
00433         f = open(self.missfile, 'w')
00434         for i in range( len(self.missSeqList) ):
00435             seqstamp = str((self.missSeqList)[i])
00436             runnum = self.missSeqMap[int(seqstamp)]
00437             f.write( runnum + " " + seqstamp + '\n' )
00438             #f.write( seqstamp + " " + runnum + '\n' )
00439         f.close()

def job::runPQM::writeMissing (   self,
  runnum,
  seqnum 
)

Definition at line 440 of file job.py.

00441                                           :
00442         f = open(self.missfile, 'aw')
00443         f.write(str(runnum) + ' ' + str(seqnum) + '\n')
00444         f.close()
00445         self.missSeqSort()

def job::runPQM::delJob (   self,
  seqList 
)

Definition at line 447 of file job.py.

00448                              :
00449         if len(seqList) > 0:
00450             os.system('qstat | grep dqmshift > log.temp')
00451             seqs = []
00452             fth = open("./threads", "r")
00453             seqs = fth.readlines()
00454             fth.close()
00455             self.writeLog("Sequence numbers in threads: ")
00456             self.writeLog(seqs)
00457             f = open("log.temp", "r")
00458             for line in f:
00459                 self.writeLog(line)
00460                 pos1 = line.find("-")
00461                 pos2 = line.find(".sh")
00462                 seq = line[pos1+1:pos2]
00463                 if seqs.count(seq + '\n') > 0:
00464                     pos = line.find(".")
00465                     jobid = line[0:pos]
00466                     command = 'qdel ' + jobid
00467                     self.writeLog(command)
00468                     os.system('qdel ' + str(jobid))
00469             f.close()
00470             os.system('rm -rf log.temp && rm -rf *.sh.*')
00471         else:
00472             pass

def job::runPQM::checkexit (   self)

Definition at line 474 of file job.py.

00475                        :
00476         if os.path.exists('stopjob'):
00477             seqs = self.readSeqList(self.THREADS)
00478             ths = len(seqs)
00479             self.writeLog('\n\nRoger stop message, but now ' + str(ths) + ' threads are running. Deleting them...\n')
00480             self.delJob(seqs)
00481             fth = open(self.THREADS, 'r')
00482             minseq = fth.readline()
00483             fth.close()
00484             if minseq != '' and int(minseq) > 0:
00485                 self.writeCurSeq(int(minseq))
00486             os.system('rm -rf stopjob')
00487             os.system('rm -rf threads')
00488             sys.exit(1)
00489         else:
00490             pass
00491         #ths = self.seqNumCount(self.THREADS)

def job::runPQM::getFileDir (   self,
  seqnum 
)

Definition at line 492 of file job.py.

00493                                 :
00494         dir = ''
00495         if self.seqFileDirMap.has_key(str(seqnum)):
00496             dir = self.seqFileDirMap[str(seqnum)]
00497         else:
00498             fileinfo = self.getfile(int(seqnum))
00499             filename = fileinfo[1]
00500             dir = self.filepath + filename
00501         return dir

def job::runPQM::checkFile (   self)

Definition at line 502 of file job.py.

00503                        :
00504         if not os.path.exists(self.missfile): return
00505         self.readMissSeq()
00506 
00507         # Go to check again whether then-missing files exist or not
00508         for item in self.missSeqList:
00509             # Because "spade" only stores about 1000 data files
00510             if int(self.maxseq) - int(item) > 1000: continue
00511 
00512             dir = self.getFileDir(item)
00513             if os.path.exists(dir):
00514                 # Remove the sequence number in "missingfile"
00515                 os.system("sed -i '/" + str(item) + "/d' " + self.missfile)
00516                 if self.seqFileDirMap.has_key(str(item)):
00517                     del self.seqFileDirMap[str(item)]

def job::runPQM::addSeqNum (   self)

Definition at line 518 of file job.py.

00519                        :
00520         # Each time increase the sequence number, first check missing files exist or not.
00521         self.checkFile()
00522 
00523         self.writeCurSeq(self.seqnum)
00524         self.seqnum = self.seqnum + 1
00525         if self.lastSeqNum < self.seqnum:
00526             newinfo = "\n****************Current Sequence = %s; " %self.seqnum
00527             newinfo = newinfo + "Maximum Sequence = %s****************" %self.maxseq
00528             self.writeLog(newinfo)
00529             #self.writeLog('\n*********************Current Sequence Number = %s*********************' %self.seqnum)
00530         self.lastSeqNum = self.seqnum

def job::runPQM::updateMaxSeq (   self)

Definition at line 531 of file job.py.

00532                           :
00533         self.maxseq = getmaxseq()

def job::runPQM::subJob (   self,
  seqdir,
  seq 
)

Definition at line 534 of file job.py.

00535                                  :
00536         self.checkexit()
00537 
00538         jobpath = seqdir + 'seq-' + str(seq) + '.sh'
00539         fpqmq = os.popen("qstat | grep dqmshift | grep pqmq | wc -l")
00540         fdybq = os.popen("qstat | grep dqmshift | grep dybq | wc -l")
00541         pqmq = (fpqmq.readline()).split("\n")[0]
00542         dybq = (fdybq.readline()).split("\n")[0]
00543         fdybq.close()
00544         fpqmq.close()
00545         if int(dybq) < 32:
00546             os.system('qsub -q dybq ' + jobpath)
00547         else:
00548             if int(pqmq) < 15:
00549                 os.system('qsub -q pqmq ' + jobpath)
00550 
00551         jobstat = os.system('qstat | grep ' + 'seq-' + str(seq) + '.sh')
00552         if jobstat == 0:
00553             os.system('touch ' + seqdir + 'start' + str(seq))
00554             self.writeLog('Submit job: ' + jobpath + ' @' + time.strftime(self.ISOTIMEFORMAT,time.localtime()))
00555         else:
00556             os.system('touch ' + seqdir + 'fail' + str(seq))
00557             self.writeLog(jobpath + ' not submitted successfully')

def job::runPQM::checkSkip (   self,
  runnum 
)

Definition at line 558 of file job.py.

00559                                :
00560         skip = False
00561         # In case that many files in sequence are not transferred, just skip them, avoiding waiting.
00562         self.updateMaxSeq()
00563         if int(self.maxseq) - int(self.seqnum) > 20:
00564             self.writeMissing(runnum, self.seqnum)
00565             self.writeLog('Raw data not found, but maximum seqnum is ' + str(self.maxseq) + ". Go to next record.")
00566             self.addSeqNum()
00567             skip = True
00568         return skip

def job::runPQM::writeJob (   self)

Definition at line 569 of file job.py.

00570                       :
00571         self.checkexit()
00572 
00573         # If a file is tagged as transferred but doesn't created for 15 minutes, go to next record
00574         file_create_time = 0
00575     
00576         # Get file name and run number according to sequence number from database
00577         fileinfo = self.getfile(int(self.seqnum))
00578     
00579         if not fileinfo:
00580             # Empty record
00581             if self.seqnum <= self.maxseq:
00582                 self.writeLog('Empty record, go to next')
00583                 self.addSeqNum()
00584                 self.updateMaxSeq()
00585                 file_create_time = 0
00586                 self.checkexit()
00587                 return 
00588     
00589             # Reach the latest record, wait 10 seconds then requery database
00590             # Print a log message every 10 minutes to show I'm still alive
00591             if self.waitnum == 0:
00592                 self.writeLog('No more record, wait ...')
00593             self.waitnum = self.waitnum + 1
00594             if self.waitnum == 60:
00595                 self.waitnum = 0
00596             time.sleep(10)
00597             self.checkexit()
00598             return
00599     
00600         self.waitnum = 0
00601     
00602         # Temporarily skip test data
00603         if fileinfo[1].find('test') > -1:
00604             self.writeLog('Ignore ' + fileinfo[1])
00605             self.addSeqNum()
00606             self.updateMaxSeq()
00607             file_create_time = 0
00608             self.checkexit()
00609             return
00610             
00611         # Get file index
00612         filename = fileinfo[1]
00613         runnum = fileinfo[2]
00614         file = self.filepath + filename
00615         self.seqFileDirMap[str(self.seqnum)] = file
00616         namelist = filename.split('.')
00617         fileindex = namelist[6]
00618     
00619         looptime = 15 
00620         waittime = 60
00621         # Check file exists or not
00622         if not os.path.isfile(file):
00623             skip = self.checkSkip(runnum)
00624             if skip: return
00625             nextfileinfo = self.getfile(int(self.seqnum + 1))
00626             if nextfileinfo == None or not os.path.exists(self.filepath + nextfileinfo[1]):
00627                 while 1:
00628                     if file_create_time == looptime:
00629                         self.writeLog('Raw data still not found after ' + str(looptime*waittime/60) + ' minutes, go to next record.')
00630                         file_create_time = 0
00631                         self.writeMissing(runnum, self.seqnum)
00632                         self.addSeqNum()
00633                         self.updateMaxSeq()
00634                         return
00635                     else:
00636                         skip = self.checkSkip(runnum)
00637                         if skip: return
00638                         self.writeLog('Raw data can not be found, wait ' + str(waittime) + ' seconds ...')
00639                         time.sleep(waittime)
00640                     if os.path.isfile(file):
00641                         break
00642                     file_create_time = file_create_time + 1
00643                     self.checkexit()
00644         self.writeLog('Successfully find ' + file)
00645         # Now remove file info of sequence numbers with existing files in 'spade'
00646         del self.seqFileDirMap[str(self.seqnum)]
00647         
00648         runpath = self.threadspath + 'run' + str(runnum) + '/'
00649         if not os.path.exists(runpath):
00650             os.mkdir(runpath)
00651         seqdir = runpath + 'seq' + str(self.seqnum) + '/'
00652         if not os.path.exists(seqdir):
00653             os.mkdir(seqdir)
00654         else:
00655             os.system('rm -rf ' + seqdir)
00656             os.mkdir(seqdir)
00657         jobname = seqdir + 'seq-' + str(self.seqnum) + '.sh'
00658         logname = seqdir + 'log.' + str(self.seqnum)
00659     
00660         # Write the self.seqnum into 'threads', for a job script will be generated for this self.seqnum
00661         fthw = open('./threads', 'aw')
00662         fthw.write(str(self.seqnum) + '\n')
00663         fthw.close()
00664     
00665         curdir = os.getcwd() + '/'
00666         sumhist = 'sum_' + str(runnum) + '.root'
00667         seqhist = 'hist_' + str(self.seqnum) + '.root'
00668         curhist = 'current_hist_' + str(runnum) + '.root'
00669     
00670         fseq = open(jobname, 'w')
00671         fseq.write('#!/bin/bash\n\n')
00672         #fseq.write('source /dyb/nuwadata/NuWa/env/NuWa-trunk/nuwa-opt.bash\n\n')
00673         fseq.write('source /dyb/nuwadata/NuWa/env/NuWa-PQM/nuwa-opt.bash\n\n')
00674         fseq.write('cd ~dqmshift/NuWa/DQMRawData/cmt/\n')
00675         fseq.write('source setup.sh\n')
00676         fseq.write('source setup.sh\n\n')
00677         fseq.write('cd ' + curdir + seqdir + '\n')
00678         fseq.write('if [ ! -f "' + file + '" ]; then\n')
00679         fseq.write('  echo raw data is not found, skip current job\n')
00680         fseq.write('  touch norawdata' + str(self.seqnum) + '\n')
00681         fseq.write('else\n')
00682         fseq.write('  echo start NuWa job: ' + file + '\n\n')
00683         if self.debug:
00684             fseq.write('  nuwa.py -n 10000 -l 4 --repack-rpc="on" --dbconf=offline_db -m"DQMRawData.runDaq" -m"Quickstart.Calibrate"  -m "Quickstart.CalculateCalibStats" -m"Quickstart.Reconstruct" -m"DQMRawData.runRecon" -m"DQMRawData.runRpc" --output-stats="{\'file1\':\'hist_test.root\'}" ' + file + '\n\n')
00685         else:
00686             fseq.write('  nuwa.py -n -1 -l 4 --repack-rpc="on" --dbconf=offline_db -m"DQMRawData.runDaq" -m"Quickstart.Calibrate"  -m "Quickstart.CalculateCalibStats" -m"Quickstart.Reconstruct" -m"DQMRawData.runRecon" -m"DQMRawData.runRpc" --output-stats="{\'file1\':\'hist_test.root\'}" ' + file + '\n\n')
00687         #fseq.write('  nuwa.py -n -1 -l 4 --repack-rpc=on --dbconf=offline_db -m"DQMRawData.runDaq" -m"Quickstart.Calibrate"  -m"DQMRawData.runCalib" -m"Quickstart.Reconstruct" -m"DQMRawData.runRecon" -m"DQMRawData.runRpc" --output-stats="{\'file1\':\'hist_test.root\'}" ' + file + '\n\n')
00688         fseq.write('  echo nuwa job finished\n')
00689         fseq.write('  ls -ahl ../\n')
00690         fseq.write('  declare -i loop\n')
00691         fseq.write('  if [ ! -f "../' + sumhist + '" ]; then\n')
00692         fseq.write('    mv hist_test.root ../' + sumhist + '\n')
00693         fseq.write('    echo no ' + sumhist + ', so this is the first finished job of this run\n')
00694         fseq.write('    cd ..\n')
00695         fseq.write('    ls -ahl\n')
00696         fseq.write('  else\n')
00697         fseq.write('    echo ' + sumhist + ' already exists\n')
00698         fseq.write('    mv hist_test.root ../' + seqhist + '\n')
00699         fseq.write('    cd ..\n')
00700         fseq.write('    ls -ahl\n')
00701         fseq.write('    while [ -f "' + curhist + '" ] && [ "$loop" != "20" ]\n')
00702         fseq.write('    do\n')
00703         fseq.write('      echo ' + curhist + ' already exists\n')
00704         fseq.write('      echo other files are merging. Wait 5s!\n')
00705         fseq.write('      loop=loop+1\n')
00706         fseq.write('      sleep 5\n')
00707         fseq.write('    done\n')
00708         fseq.write('    echo loop for ' + curhist + ' finished.\n')
00709         fseq.write('    if [ "$loop" = "20" ]; then\n')
00710         fseq.write('      echo waiting too much time, force to merge\n')
00711         fseq.write('      unbuffer hadd -f ' + curhist + ' ' + seqhist+ ' ' + sumhist + '\n')
00712         fseq.write('    else\n')
00713         fseq.write('      echo now begin to merge ' + sumhist + 'and ' + seqhist + '\n')
00714         fseq.write('      loop=0\n')
00715         fseq.write('      size=1\n')
00716         fseq.write('      while [ $size -ne 0 ] && [ "$loop" != "20" ]\n')
00717         fseq.write('      do\n')
00718         fseq.write('        echo try to merge\n')
00719         tmpname = 'log.tmp.' + str(self.seqnum) + '.' + time.strftime(self.ISOTIMEFORMAT1,time.localtime()) 
00720         fseq.write('        unbuffer hadd ' + curhist + ' ' + seqhist + ' ' + sumhist + ' | grep Error > ' + tmpname + '\n')
00721         fseq.write('        size=$(stat "-c%s" ' + tmpname + ')\n')
00722         fseq.write('        if [ $size -eq 0 ]; then\n')
00723         fseq.write('          echo merging successfully\n')
00724         fseq.write('          rm ' + tmpname + '\n')
00725         fseq.write('        else\n')
00726         fseq.write('          echo Error when merging, wait 5s, then merge again\n')
00727         if self.debug:
00728             fseq.write('          cp ' + tmpname + ' ../../Shdir/' + tmpname + '.$loop\n')
00729         fseq.write('          loop=loop+1\n')
00730         fseq.write('          sleep 5\n')
00731         fseq.write('        fi\n')
00732         fseq.write('      done\n')
00733         fseq.write('      if [ "$loop" = "20" ]; then\n')
00734         fseq.write('        echo waiting too much time again, force to merge\n')
00735         fseq.write('        unbuffer hadd -f ' + curhist + ' ' + seqhist + ' ' + sumhist + '\n')
00736         fseq.write('      fi\n')
00737         fseq.write('      echo merge finished\n')
00738         fseq.write('    fi\n')
00739         fseq.write('    ls -ahl\n')
00740 
00741         fseq.write('    echo Merging finished!\n')
00742         fseq.write('    rm -rf hist_' + str(self.seqnum) + '.root\n')
00743         fseq.write('    mv ' + curhist + ' ' + sumhist + '\n')
00744         fseq.write('  fi\n\n')
00745         fseq.write('  echo now ' + sumhist + ' should exist\n\n')
00746         fseq.write('  ls -ahl\n')
00747 
00748         fseq.write('  loop=0\n')
00749         fseq.write('  while [ -d "stats" ] && [ "$loop" != "20" ]\n')
00750         fseq.write('  do\n')
00751         fseq.write('    echo pngs of last file not transferred. Wait 5s!\n')
00752         fseq.write('    loop=loop+1\n')
00753         fseq.write('    sleep 5\n')
00754         fseq.write('  done\n')
00755         fseq.write('  if [ "$loop" = "20" ]; then\n')
00756         fseq.write('    echo stats still exists, no more waiting\n')
00757         fseq.write('  fi\n')
00758         fseq.write('  echo now stats should not exist\n')
00759         fseq.write('  ls -ahl\n')
00760 
00761         fseq.write('  if [ ! -f "dump.C" ]; then\n')
00762         fseq.write('    echo first instance for printing pngs\n')
00763         fseq.write('    cp ../../dump.C .\n')
00764         fseq.write('    cp ../../HistoPrinter.C .\n')
00765         fseq.write('  fi\n\n')
00766 
00767         fseq.write('  loop=0\n')
00768         fseq.write('  while [ -f "HistoPrinter_C.d" ] && [ "$loop" != "20" ]\n')
00769         fseq.write('  do\n')
00770         fseq.write('    echo pngs of last file not generated completely. Wait 5s!\n')
00771         fseq.write('    loop=loop+1\n')
00772         fseq.write('    sleep 5\n')
00773         fseq.write('  done\n')
00774         fseq.write('  if [ "$loop" = "20" ]; then\n')
00775         fseq.write('    echo HistoPrinter_C.d still exists, begin to generate pngs for this file\n')
00776         fseq.write('  fi\n')
00777         fseq.write('  ls -ahl\n')
00778         fseq.write('  echo start generate png pictures:\n')
00779         fseq.write('  cp ../../HistoPrinter_C* .\n')
00780         fseq.write('  ls -ahl\n')
00781         #fseq.write(  'root -b -l -q ../../daya_style.C dump.C\(\\"sum_' + str(runnum) + '.root\\"\) < /dev/null\n\n')
00782         fseq.write('  loop=0\n')
00783         fseq.write('  size=1\n')
00784         fseq.write('  while [ $size -ne 0 ] && [ "$loop" != "20" ]\n')
00785         fseq.write('  do\n')
00786         fseq.write('    echo try to print pngs\n')
00787         tmpname = 'log.tmp.' + str(self.seqnum) + '.' + time.strftime(self.ISOTIMEFORMAT1,time.localtime()) 
00788         fseq.write('    unbuffer root -b -l -q ../../daya_style.C dump.C\(\\"' + sumhist + '\\"\)' + ' | grep Error > ' + tmpname + '\n')
00789         fseq.write('    ls -ahl\n')
00790         fseq.write('    sed -i "/log scale/"d ' + tmpname + '\n')
00791         fseq.write('    echo after use sed to delete log scale error\n')
00792         fseq.write('    ls -ahl\n')
00793         fseq.write('    size=$(stat "-c%s" ' + tmpname + ')\n')
00794         fseq.write('    if [ $size -eq 0 ]; then\n')
00795         fseq.write('      echo printing successfully\n')
00796         fseq.write('      if [ -d "stats' + str(self.seqnum) + '" ]; then # In case the file being reprocessed\n')
00797         fseq.write('        rm -rf stats' + str(self.seqnum) + '/*\n')
00798         fseq.write('      else\n')
00799         fseq.write('        mkdir stats' + str(self.seqnum) + '\n')
00800         fseq.write('      fi\n')
00801         fseq.write('      mv stats/* stats' + str(self.seqnum) + '\n')
00802         fseq.write('      rm ' + tmpname + '\n')
00803         fseq.write('    else\n')
00804         fseq.write('      echo Error when printing, wait 5s, then print again\n')
00805         if self.debug:
00806             fseq.write('      cp ' + tmpname + ' ../../Shdir/' + tmpname + '.$loop\n')
00807         fseq.write('      loop=loop+1\n')
00808         fseq.write('      echo "$loop"\n')
00809         fseq.write('      sleep 5\n')
00810         fseq.write('    fi\n')
00811         fseq.write('    if [ $size -eq 204 ]; then\n')
00812         fseq.write('      echo find size of log.tmp is equal to 204, move it to Shdir\n')
00813         if self.debug:
00814             fseq.write('      cp log.tmp ../../Shdir/log.tmp.' + str(self.seqnum) + '.' + time.strftime(self.ISOTIMEFORMAT1,time.localtime()) + '\n')
00815         fseq.write('    fi\n')
00816         fseq.write('  done\n')
00817         fseq.write('  echo "$loop"\n')
00818         fseq.write('  if [ "$loop" = "20" ]; then\n')
00819         fseq.write('    echo waiting too much time again, skip printing\n')
00820         fseq.write('    touch nostats'  + str(self.seqnum) + '\n')
00821         fseq.write('  fi\n')
00822 
00823         fseq.write('  ls -ahl\n')
00824         fseq.write('  rm HistoPrinter_C*\n')
00825         fseq.write('  ls -ahl\n')
00826         fseq.write('  if [ -d "stats' + str(self.seqnum) + '" ]; then\n')
00827         fseq.write('    echo pngs generated!\n')
00828         fseq.write('  else\n')
00829         fseq.write('    echo pngs not generated!\n')
00830         fseq.write('  fi\n\n')
00831         fseq.write('  cd -\n\n')
00832         fseq.write('fi\n')
00833         fseq.write('touch done' + str(self.seqnum) + '\n')
00834         fseq.write('echo Job done\n')
00835         fseq.close()
00836     
00837         os.system('chmod 755 ' + jobname)
00838         self.subJob(seqdir, self.seqnum)
00839     
00840         # Next record
00841         self.addSeqNum()
00842         self.updateMaxSeq()
00843         file_create_time = 0
    
def job::runPQM::readSeqList (   self,
  fileName 
)

Definition at line 844 of file job.py.

00845                                    :
00846         fth = open(fileName, 'r')
00847         l = []
00848         for i in fth:
00849             a = i.split('\n')
00850             l.append( a[0] )
00851         fth.close()
00852         return l

def job::runPQM::writeCurSeq (   self,
  seq 
)

Definition at line 853 of file job.py.

00854                               :
00855         fseqnum = open('./cur_seq_num', 'w')
00856         fseqnum.write(str(seq))
00857         fseqnum.close()

def job::runPQM::seqNumCount (   self,
  fileName 
)

Definition at line 858 of file job.py.

00859                                    :
00860         self.checkexit()
00861 
00862         seqList = []
00863         seqList = self.readSeqList(fileName)
00864     
00865         # Check whether some jobs have finished
00866         if len(seqList) > 0:
00867             for seq in seqList:
00868                 fileinfo = self.getfile(int(seq))
00869                 runnum = fileinfo[2]
00870                 runpath = self.threadspath + 'run' + str(runnum) + '/'
00871                 seqpath = runpath + 'seq' + str(seq) + '/'
00872                 doneseq = seqpath + 'done' + str(seq)
00873                 if os.path.isfile(doneseq):
00874                     shlist = glob.glob('*.sh.*')
00875                     #shlist = glob.glob('*' + str(seq) + '.sh.*')
00876                     for i  in range(len(shlist)):
00877                         curdir = os.getcwd() + '/'
00878                         if self.debug:
00879                             os.system('mv ' + shlist[i] + ' ' + curdir + 'Shdir')
00880                         else:
00881                             os.system('rm -rf ' + shlist[i])
00882 
00883                     self.writeLog('delete ' + str(seq) + ' from threads.' + time.strftime(self.ISOTIMEFORMAT,time.localtime()))
00884                     # First delete the sequence number from 'threads'
00885                     sedcommond = 'sed -i "/' + str(seq) + '/d" ' + fileName
00886                     os.system(sedcommond)
00887                     self.writeJob()# First submit a new job, then move on
00888                     if os.path.exists(seqpath + 'norawdata' + str(seq)):
00889                         self.writeLog('No raw data was found when executing nuwa job for ' + str(seq) + ', skip it!')
00890                         self.writeMissing(runnum, seq)
00891                         continue
00892                     if os.path.exists(seqpath + 'nostats' + str(seq)):
00893                         self.writeLog('Printing for ' + str(seq) + ' maybe fail, skip it!')
00894                         continue
00895     
00896                     self.writeLog('move png for ' + str(seq))
00897                     statspath = runpath + 'stats' + str(seq)
00898                     if os.path.exists(statspath):
00899                         site = self.getsite(statspath)
00900                         self.setAvaPlots(statspath, runnum, seq)
00901                         self.movePng(runpath, runnum, seq)
00902                         self.updateRealtimeLink(site, runnum)
00903                         self.setFileName(site, fileinfo[1], runnum, seq)
00904                         self.setAvaRun(fileinfo)
00905                     else:
00906                         if os.path.exists(doneseq):
00907                             self.writeLog(doneseq + ' found')
00908                         self.writeLog(statspath + ' not found')
00909     
00910                     # I don't quite understand the following two lines
00911                     if os.path.exists( 'temp' ):
00912                         os.system('sed -i \'$d\' ' + self.adinfofile)
00913                         
00914                     datpath = runpath + 'stat.dat'
00915                     self.setRunInfo(datpath, runnum)
00916     
00917                     if self.runEndOrNot( runnum ) == 'YES':
00918                         getAll = self.getRunFileAll( runnum )
00919                         seqlist = []
00920                         for i in getAll:
00921                             seqlist.append(str(i[0]))
00922                         runDoneOrNot = 'NULL'
00923                         for seqit in seqlist:
00924                             if int(seqit) >= self.seqnum_start:
00925                                 donepath = runpath + 'seq' + str(seqit) + '/done' + str(seqit)
00926                                 if not os.path.exists(donepath):
00927                                     self.writeLog('Run ' + str(runnum) + ' not done yet. ' + donepath + ' not found.')
00928                                     runDoneOrNot = 'NO'
00929                                     break
00930                                 else:
00931                                     runDoneOrNot = 'YES'
00932                         if runDoneOrNot == 'YES':
00933                             threadslist = []
00934                             ftemp = open(self.THREADS, 'r')
00935                             for ith in ftemp:
00936                                 th = ith.split('\n')
00937                                 threadslist.append(th[0])
00938                             ftemp.close()
00939                             intersection = list(set(seqlist) & set(threadslist))
00940                             if intersection == []:
00941                                 self.transferFile(runnum, seq)
00942                             else:
00943                                 continue
00944                 else: # Check whether the job of this sequence number has been submitted
00945                     startseq = seqpath + 'start' + str(seq)
00946                     if not os.path.exists(startseq):
00947                         self.writeLog('Job of seq' + str(seq) + ' not submitted, submit it again.')
00948                         self.subJob(seqpath, seq)
00949     
00950         seqList = self.readSeqList(fileName)
00951         return len( seqList )
00952 

def job::runPQM::run (   self)

Definition at line 953 of file job.py.

00954                  :
00955         self.mkPngPath()
00956         self.mkAvaRunNum()
00957         self.mkLogPath()
00958         self.mkLinkLog()
00959         self.mkHistoPath()
00960         self.mkHistoryPath()
00961         self.mkAdinfoFile()
00962         self.mkThrPath()
00963         self.mkThreads()
00964         self.writeLog('########### Job start at %s ###########\n' % time.strftime(self.ISOTIMEFORMAT,time.localtime()))
00965         if os.path.exists(self.missfile):
00966             os.system("cp " + self.missfile + " MissingLogs/" + self.missfile + time.strftime(self.ISOTIMEFORMAT1,time.localtime()))
00967 
00968         while 1:
00969             f = open(self.logfile, 'aw')
00970             if self.seqnum == self.seqnum_start or self.lastSeqNum < self.seqnum:
00971                 self.writeLog('\n*********************Current Sequence Number = %s*********************' %self.seqnum)
00972             self.lastSeqNum = self.seqnum
00973 
00974             if self.waitnum == 0:
00975                 self.writeLog('Start time: ' + time.strftime(self.ISOTIMEFORMAT,time.localtime()))
00976 
00977             # Check how many threads are running
00978             ths = self.seqNumCount(self.THREADS)
00979             while 1:
00980                 ths = self.seqNumCount(self.THREADS)
00981                 if ths >= self.NUMOFJOB:
00982                     self.writeLog('There are %s threads are running. Waiting...' %self.NUMOFJOB)
00983                     time.sleep(10)
00984                     self.checkexit()
00985                     continue
00986                 else:
00987                     break
00988             self.writeLog('Now ' + str(ths) + ' (<' + str(self.NUMOFJOB) + ') threads, prepare to submit new job.')
00989             self.writeJob()


Member Data Documentation

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.

Definition at line 42 of file job.py.


The documentation for this class was generated from the following file:
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 10:02:12 for DQMRawData by doxygen 1.7.4