/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

Public Member Functions | Public Attributes
Manager::Manager Class Reference

List of all members.

Public Member Functions

def __init__
def initializeOptions
def parseOptions
def run
def runOneJob
def runBatch
def runCommand
def sequencesInRun
def processSequence
def addSequenceStats
def dumpPmt
 dump pmt info to DB##########################
def dumpTrigger
 dump trigger info to DB##########################
def dumpLivetime
 dump livetime info to DB##########################
def dumpDqAll
 dump dq info to DB##########################
def dumpRate
 run another script to get rate from file ############
def generateFigures
def updateRunIndex
def updateFullIndex
def clearSequence
def clearSequenceStats
def clearSequenceDumpPmt
def clearSequenceDumpTrigger
def clearSequenceDumpLivetime
def clearSequenceDumpDqAll
def clearSummary
def printState
def resolveRunSequenceFile
def makeJobCommand
def ensureDirectories
def setLock
 Functions for handling job locks #################################.
def isLocked
def removeLock
def lockStats
def statsLocked
def unlockStats
def lockRun
def runLocked
def unlockRun
def lockRunIndex
def runIndexLocked
def unlockRunIndex
def getState
 Functions for handling job state #################################.
def setState
def setStateInFile
def getStateFromFile
def lastLine

Public Attributes

 jobConfig
 siteConfig
 stateSep
 stateNParts
 opts_parser
 argv
 opts
 args

Detailed Description

Class which manages NuWa jobs

Definition at line 14 of file Manager.py.


Constructor & Destructor Documentation

def Manager::Manager::__init__ (   self)
Constructor

Definition at line 16 of file Manager.py.

00017                       :
00018         """Constructor"""
00019         self.jobConfig = None
00020         self.siteConfig = None
00021         self.stateSep=','
00022         self.stateNParts=2
00023         self.initializeOptions()
00024         return


Member Function Documentation

def Manager::Manager::initializeOptions (   self)
Add options to command line parser 

Definition at line 25 of file Manager.py.

00026                                :
00027         """ Add options to command line parser """
00028         from optparse import OptionParser
00029         from DybPython.hostid import hostid
00030         parser = OptionParser(usage=self.__doc__)
00031 
00032         parser.add_option("-S", "--summarize-run",action="store_true",
00033                           default=False, help="Summarize run histograms")
00034 
00035         parser.add_option("", "--print-state",action="store_true",
00036                           default=False, help="Print the job state of a run")
00037 
00038         parser.add_option("", "--clear-sequence",action="store_true",
00039                           default=False, help="Clear NuWa Job state for reprocessing")
00040 
00041         parser.add_option("", "--clear-sequence-stats",action="store_true",
00042                           default=False, help="Clear Add Stats state for readding")
00043 
00044         parser.add_option("", "--clear-sequence-dump-pmt",action="store_true",
00045                           default=False, help="Clear dump pmt job state for reprocessing")
00046         
00047         parser.add_option("", "--clear-sequence-dump-trigger",action="store_true",
00048                           default=False, help="Clear dump trigger job state for reprocessing")
00049         
00050         parser.add_option("", "--clear-sequence-dump-livetime",action="store_true",
00051                           default=False, help="Clear dump livetime job state for reprocessing")
00052         
00053         parser.add_option("", "--clear-sequence-dump-dq-all",action="store_true",
00054                           default=False, help="Clear dump dq-all job state for reprocessing")
00055         
00056         parser.add_option("", "--clear-summary",action="store_true",
00057                           default=False, help="Clear run summary for reprocessing")
00058 
00059         parser.add_option("", "--clear-run-total",action="store_true",
00060                           default=False, help="Clear run summary and total stats for reprocessing")
00061 
00062         parser.add_option("", "--clear-all",action="store_true",
00063                           default=False, help="Clear nuwa job and run summary for reprocessing")
00064 
00065         parser.add_option("", "--run-nuwa",action="store_true",
00066                           default=False, help="Process NuWa Job")
00067 
00068         parser.add_option("", "--add-stats",action="store_true",
00069                           default=False, help="Add root file to run total")
00070 
00071         parser.add_option("", "--make-figures",action="store_true",
00072                           default=False, help="Make figures for this job/run")
00073 
00074         parser.add_option("", "--xml-index",action="store_true",
00075                           default=False, help="Make XML index for this job/run")
00076 
00077         parser.add_option("", "--xml-add-run",action="store_true",
00078                           default=False, help="Add run to site XML index")
00079 
00080         parser.add_option("-j", "--job-name",default=None,type="string",
00081                           help="Name of NuWa Job")
00082 
00083         parser.add_option("-r", "--run-number",default=None,type="int",
00084                           help="Run Number for NuWa Job")
00085 
00086         parser.add_option("-A", "--all-sequences",action="store_true",
00087                           default=False, help="Process all sequences in run")
00088 
00089         parser.add_option("-s", "--sequence",default=None,type="int",
00090                           help="File Sequence Number for NuWa Job")
00091 
00092         parser.add_option("-f", "--filename",default=None,type="string",
00093                           help="Data file name for current job")
00094 
00095         parser.add_option("-l", "--log-level",default=3,type="int",
00096                           help="Set output log level")
00097 
00098         parser.add_option("-b", "--batch",action="store_true",default=False,
00099                           help="Send this job to the batch queue")
00100 
00101         parser.add_option("-c", "--cluster",default="pdsf",type="string",
00102                           help="Use the configuration for this cluster")
00103 
00104         parser.add_option("-d", "--dry-run",action="store_true",default=False,
00105                           help="Debug run; don't run any actual jobs")
00106 
00107         parser.add_option("", "--catalog",action="append",type="string",
00108                           help="Generate a catalog for files in this directory")
00109                           
00110         parser.add_option("", "--dump-rate",action="store_true",default=False,
00111                           help="dump event rate for this run")
00112 
00113         parser.add_option("", "--dump-pmt",action="store_true",default=False,
00114                           help="dump pmt info into DB")
00115         
00116         parser.add_option("", "--dump-trigger",action="store_true",default=False,
00117                           help="dump trigger info into DB")
00118         
00119         parser.add_option("", "--dump-livetime",action="store_true",default=False,
00120                           help="dump livetime info into DB")
00121         
00122         parser.add_option("", "--dump-dq-all",action="store_true",default=False,
00123                           help="dump dq info into DB")
00124         self.opts_parser = parser
00125 
00126         return
00127 

def Manager::Manager::parseOptions (   self,
  argv 
)
Parse command line options to given to Manager

Definition at line 128 of file Manager.py.

00129                                 :
00130         """Parse command line options to given to Manager"""
00131         self.argv = argv
00132         (options, args) = self.opts_parser.parse_args(args=argv)
00133         self.opts = options
00134         self.args = args
00135 
00136         if siteConfigs.has_key(self.opts.cluster):
00137             self.siteConfig = siteConfigs[self.opts.cluster]
00138         else:
00139             print "Error: Unknown computer cluster '%s'; add to SiteConfigs" % self.opts.cluster
00140             return Status.FAILURE
00141         if self.opts.catalog:
00142             self.siteConfig.catalogPaths = self.opts.catalog
00143 
00144         # Translate compound option
00145         if self.opts.summarize_run:
00146             self.opts.make_figures = True
00147             self.opts.xml_index = True
00148             self.opts.xml_add_run = True
00149 
00150         # Translate compound option
00151         if self.opts.clear_run_total:
00152             self.opts.clear_summary = True
00153             self.opts.clear_sequence_stats = True
00154             self.opts.all_sequences = True
00155 
00156         # Translate compound option
00157         if self.opts.clear_all:
00158             self.opts.clear_summary = True
00159             self.opts.clear_sequence = True
00160             self.opts.all_sequences = True    
00161 
00162         if self.opts.filename!=None or (self.opts.run_number!=None
00163                                         and self.opts.sequence!=None):
00164             # Check for consistency of file, run, sequence
00165             # and fill out any unset parts if possible
00166             [status, run, seq, fname] = self.resolveRunSequenceFile(
00167                 self.opts.run_number,
00168                 self.opts.sequence,
00169                 self.opts.filename)
00170             if status != Status.SUCCESS: return status
00171             self.opts.run_number = run
00172             self.opts.sequence = seq
00173             self.opts.filename = fname
00174 
00175         # Check consistentcy of command line arguments
00176         if (self.opts.run_nuwa
00177             or self.opts.add_stats
00178             or self.opts.make_figures
00179             or self.opts.xml_index
00180             or self.opts.xml_add_run
00181             or self.opts.clear_sequence
00182             or self.opts.clear_sequence_stats
00183             or self.opts.clear_summary
00184             or self.opts.print_state):
00185             if self.opts.run_number==None:
00186                 print "Error: No run number has been found or provided"
00187                 return Status.FAILURE
00188         if (self.opts.run_nuwa
00189             or self.opts.add_stats
00190             or self.opts.make_figures
00191             or self.opts.xml_index
00192             or self.opts.clear_sequence
00193             or self.opts.clear_sequence_stats
00194             or self.opts.clear_summary
00195             or self.opts.dump_pmt
00196             or self.opts.clear_sequence_dump_pmt
00197             or self.opts.dump_trigger
00198             or self.opts.clear_sequence_dump_trigger
00199             or self.opts.dump_livetime
00200             or self.opts.clear_sequence_dump_livetime
00201             or self.opts.dump_dq_all
00202             or self.opts.clear_sequence_dump_dq_all
00203             or self.opts.print_state):            
00204             if self.opts.job_name==None:
00205                 print "Error: A job name must be provided: use '-j'"
00206                 return Status.FAILURE
00207             if jobConfigs.has_key(self.opts.job_name):
00208                 self.jobConfig = jobConfigs[self.opts.job_name]
00209             else:
00210                 print "Error: Unknown job name '%s'" % self.opts.job_name
00211                 return Status.FAILURE
00212         if (self.opts.run_nuwa
00213             or self.opts.clear_sequence
00214             or self.opts.clear_sequence_stats) and not self.opts.all_sequences:
00215             if self.opts.sequence==None:
00216                 print "Error: No sequence number has been found or provided"
00217                 return Status.FAILURE
00218             if self.opts.filename==None:
00219                 print "Error: No data filename has been found or provided"
00220                 return Status.FAILURE
00221 
00222         if self.opts.all_sequences:
00223             if self.opts.sequence!=None:
00224                 print "Error: Do not set sequence when processing all sequences"
00225                 return Status.FAILURE
00226             if self.opts.filename!=None:
00227                 print "Error: Do not set filename when processing all sequences"
00228                 return Status.FAILURE
00229             if not (self.opts.run_nuwa
00230                     or self.opts.clear_sequence
00231                     or self.opts.clear_sequence_stats
00232                     or self.opts.dump_pmt
00233                     or self.opts.dump_trigger
00234                     or self.opts.dump_livetime
00235                     or self.opts.dump_dq_all
00236                     or self.opts.clear_sequence_dump_pmt
00237                     or self.opts.clear_sequence_dump_trigger
00238                     or self.opts.clear_sequence_dump_livetime
00239                     or self.opts.clear_sequence_dump_dq_all):
00240                 print "Error: Requesting 'all-sequences' but not 'run-nuwa', 'clear-sequence', 'clear-sequence-stats', 'dump-pmt', 'dump-trigger', 'dump-livetime', 'dump-dq-all', 'clear-sequence-dump-pmt', 'clear-sequence-dump-trigger', 'clear-sequence-dump-livetime' or 'clear-sequence-dump-dq-all'?"
00241                 return Status.FAILURE
00242 
00243         return Status.SUCCESS

def Manager::Manager::run (   self)
Run the manager 

Definition at line 244 of file Manager.py.

00245                  :
00246         """ Run the manager """
00247         import os
00248         os.umask(2)
00249         if self.opts.all_sequences:
00250             # Loop over all sequences in run, submit one job for each
00251             fileList = self.siteConfig.filesInRun(self.opts.run_number)
00252             for fname in fileList:
00253                 fileDesc = FileDescription(fname)
00254                 if not fileDesc.isValid:
00255                     print "Skipping invalid file: ",fname
00256                     continue
00257                 self.opts.filename = fname
00258                 self.opts.sequence = fileDesc.sequence
00259                 status = self.runOneJob()
00260                 self.opts.filename = None
00261                 self.opts.sequence = None
00262                 if status != Status.SUCCESS: return status
00263         else:
00264             status = self.runOneJob()
00265             if status != Status.SUCCESS: return status
00266         return Status.SUCCESS

def Manager::Manager::runOneJob (   self)
Run a single job 

Definition at line 267 of file Manager.py.

00268                        :
00269         """ Run a single job """
00270         if self.opts.batch:
00271             #  Catch batch submissions, and route to batch queue
00272             return self.runBatch(self.opts.job_name,
00273                                  self.opts.run_number,
00274                                  self.opts.sequence)
00275         # Not a batch job, so start working
00276         if self.opts.print_state:
00277             # Print the state for this job/run/seq
00278             status = self.printState(self.opts.job_name,
00279                                      self.opts.run_number,
00280                                      self.opts.sequence)
00281             if status != Status.SUCCESS: return status
00282         if self.opts.clear_sequence:
00283             # Clear one sequence state for reprocessing
00284             status = self.clearSequence(self.opts.job_name,
00285                                         self.opts.run_number,
00286                                         self.opts.sequence)
00287             if status != Status.SUCCESS: return status
00288             if self.jobConfig.hasStats:
00289                 status = self.clearSequenceStats(self.opts.job_name,
00290                                                  self.opts.run_number,
00291                                                  self.opts.sequence)
00292                 if status != Status.SUCCESS: return status
00293         if self.opts.clear_sequence_stats:
00294             # Clear one sequence stats for reprocessing
00295             if self.jobConfig.hasStats:
00296                 status = self.clearSequenceStats(self.opts.job_name,
00297                                                  self.opts.run_number,
00298                                                  self.opts.sequence)
00299             if status != Status.SUCCESS: return status
00300         if self.opts.clear_sequence_dump_pmt:
00301             # Clear one sequence dump job state for reprocessing
00302             if self.jobConfig.hasStats:
00303                 status = self.clearSequenceDumpPmt(self.opts.job_name,
00304                                                    self.opts.run_number,
00305                                                    self.opts.sequence)
00306             if status != Status.SUCCESS: return status
00307         if self.opts.clear_sequence_dump_trigger:
00308             # Clear one sequence dump job state for reprocessing
00309             if self.jobConfig.hasStats:
00310                 status = self.clearSequenceDumpTrigger(self.opts.job_name,
00311                                                        self.opts.run_number,
00312                                                        self.opts.sequence)
00313             if status != Status.SUCCESS: return status
00314         if self.opts.clear_sequence_dump_livetime:
00315             # Clear one sequence dump job state for reprocessing
00316             if self.jobConfig.hasStats:
00317                 status = self.clearSequenceDumpLivetime(self.opts.job_name,
00318                                                         self.opts.run_number,
00319                                                         self.opts.sequence)
00320             if status != Status.SUCCESS: return status
00321         if self.opts.clear_sequence_dump_dq_all:
00322             # Clear one sequence dump job state for reprocessing
00323             if self.jobConfig.hasStats:
00324                 status = self.clearSequenceDumpDqAll(self.opts.job_name,
00325                                                      self.opts.run_number,
00326                                                      self.opts.sequence)
00327             if status != Status.SUCCESS: return status
00328         if self.opts.clear_summary:
00329             # Clear summary for reprocessing
00330             status = self.clearSummary(self.opts.job_name, self.opts.run_number)
00331             if status != Status.SUCCESS: return status
00332         if self.opts.run_nuwa:
00333             # Process one nuwa job
00334             status = self.processSequence(self.opts.job_name,
00335                                           self.opts.run_number,
00336                                           self.opts.sequence,
00337                                           self.opts.filename)
00338             if status != Status.SUCCESS: return status
00339         if self.opts.add_stats:
00340             # Add root file to run total
00341             status = self.addSequenceStats(self.opts.job_name,
00342                                            self.opts.run_number,
00343                                            self.opts.sequence)
00344             if status != Status.SUCCESS: return status
00345         if self.opts.dump_rate:
00346             #calculate event rate
00347             status = self.dumpRate(self.opts.job_name,
00348                                    self.opts.run_number)
00349             if status != Status.SUCCESS: return status
00350         if self.opts.dump_pmt:
00351             #dump pmt info to DB
00352             status = self.dumpPmt(self.opts.job_name,
00353                                   self.opts.run_number,
00354                                   self.opts.sequence)
00355             if status != Status.SUCCESS: return status
00356         if self.opts.dump_trigger:
00357             #dump pmt info to DB
00358             status = self.dumpTrigger(self.opts.job_name,
00359                                       self.opts.run_number,
00360                                       self.opts.sequence)
00361             if status != Status.SUCCESS: return status
00362         if self.opts.dump_livetime:
00363             #dump pmt info to DB
00364             status = self.dumpLivetime(self.opts.job_name,
00365                                        self.opts.filename,
00366                                        self.opts.run_number,
00367                                        self.opts.sequence)
00368             if status != Status.SUCCESS: return status
00369         if self.opts.dump_dq_all:
00370             #dump pmt info to DB
00371             status = self.dumpDqAll(self.opts.job_name,
00372                                     self.opts.run_number,
00373                                     self.opts.sequence)
00374             if status != Status.SUCCESS: return status
00375         if self.opts.make_figures:
00376             # Generate current figures for this job
00377             status = self.generateFigures(self.opts.job_name,
00378                                           self.opts.run_number)
00379             if status != Status.SUCCESS: return status
00380         if self.opts.xml_index:
00381             # Update run xml index for this job
00382             status = self.updateRunIndex(self.opts.job_name,
00383                                          self.opts.run_number)
00384             if status != Status.SUCCESS: return status
00385         if self.opts.xml_add_run:
00386             # Add this run to the total index if needed
00387             status = self.updateFullIndex(self.opts.run_number)
00388             if status != Status.SUCCESS: return status
00389         return Status.SUCCESS

def Manager::Manager::runBatch (   self,
  jobName,
  runNumber,
  seqNumber 
)
Submit current command to the batch queue

Definition at line 390 of file Manager.py.

00391                                                      :
00392         """Submit current command to the batch queue"""
00393         batchArgs = self.argv[:]
00394         for arg in ['--batch','-b','--all-sequences']:
00395             while arg in batchArgs:
00396                 batchArgs.remove(arg)
00397         if self.opts.all_sequences:
00398             # Insert current sequence number in batch command line
00399             batchArgs += ['-s %d'%self.opts.sequence]
00400         requiredDirectories = [ self.siteConfig.batchLogDirectory(runNumber) ]
00401         status = self.ensureDirectories( requiredDirectories )
00402         if status!=Status.SUCCESS:
00403             return status
00404         # Run current job in batch mode
00405         batchCommand = self.siteConfig.batchCommand(batchArgs, jobName,
00406                                                     runNumber, seqNumber)
00407         self.runCommand( batchCommand )
00408         return Status.SUCCESS

def Manager::Manager::runCommand (   self,
  command 
)
Run a single shell command

Definition at line 409 of file Manager.py.

00410                                  :
00411         """Run a single shell command"""
00412         print "[Running: ]", command
00413         if not self.opts.dry_run:
00414             import subprocess
00415             process = subprocess.Popen(command, shell=True)
00416             return process.wait()
00417         return Status.SUCCESS

def Manager::Manager::sequencesInRun (   self,
  runNumber 
)
Get list of sequences in run from Catalog

Definition at line 418 of file Manager.py.

00419                                        :
00420         """Get list of sequences in run from Catalog"""
00421         seqList = []
00422         fileList = self.siteConfig.filesInRun(self.opts.run_number)
00423         for fname in fileList:
00424             fileDesc = FileDescription(fname)
00425             if not fileDesc.isValid:
00426                 print "Skipping invalid file: ",fname
00427                 continue
00428             seqList.append( fileDesc.sequence )
00429         return seqList

def Manager::Manager::processSequence (   self,
  jobName,
  runNumber,
  seqNumber,
  fileName 
)
Process the NuWa job with the given name for one file

Definition at line 430 of file Manager.py.

00431                                                                       :
00432         """Process the NuWa job with the given name for one file"""
00433         # Run one sequence
00434         print "Processing: %s_%07d_%04d (%s)" % (jobName, runNumber,
00435                                                  seqNumber, fileName)
00436         seqState = self.getState( JobType.RUN, jobName, runNumber, seqNumber)
00437         if seqState.state==State.RUN_DONE:
00438             # Skipping processed sequence
00439             print "processSequence: Run %d seq %d already processed." % (runNumber,
00440                                                                          seqNumber)
00441             return Status.SUCCESS
00442         if seqState.state==State.RUN_FAILED:
00443             # Skipping problematic sequence
00444             print "processSequence: Run %d seq %d has problems." % (runNumber,
00445                                                                     seqNumber)
00446             return Status.SUCCESS
00447         self.setState( State.RUN_PROCESSING, JobType.RUN, jobName, runNumber, seqNumber)
00448         requiredDirectories = [ self.siteConfig.logDirectory(runNumber) ]
00449         if self.jobConfig.hasStats:
00450             # Make sure output directory exists
00451             requiredDirectories += [ self.siteConfig.statsDirectory(runNumber) ]
00452         if self.jobConfig.command.find("%(nuwaDataFile)s")>=0:
00453             # Make sure nuwa data directory exists
00454             requiredDirectories += [ self.siteConfig.nuwaDataDirectory(fileName,
00455                                                                     runNumber,
00456                                                                     seqNumber) ]
00457         if self.jobConfig.command.find("%(reconFile)s")>=0:
00458             # Make sure nuwa recon directory exists
00459             requiredDirectories += [ self.siteConfig.reconDirectory(fileName,
00460                                                                     runNumber,
00461                                                                     seqNumber) ]
00462         status = self.ensureDirectories( requiredDirectories )
00463         if status!=Status.SUCCESS:
00464             self.setState( State.RUN_FAILED, JobType.RUN, jobName, runNumber, seqNumber)
00465             return status
00466         command = self.makeJobCommand(jobName, runNumber, seqNumber, fileName)
00467         # Run Command
00468         status = self.runCommand( command )
00469         if status!=Status.SUCCESS:
00470             self.setState( State.RUN_FAILED, JobType.RUN, jobName, runNumber, seqNumber)
00471             return status
00472         self.setState( State.RUN_DONE, JobType.RUN, jobName, runNumber, seqNumber)
00473         return Status.SUCCESS

def Manager::Manager::addSequenceStats (   self,
  jobName,
  runNumber,
  seqNumber 
)
Add the sequence histograms to run total

Definition at line 474 of file Manager.py.

00475                                                              :
00476         """Add the sequence histograms to run total"""
00477         # Add ROOT files
00478         status = self.lockStats(jobName, runNumber)
00479         if status!=Status.SUCCESS:
00480             self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName, runNumber,
00481                            seqNumber)
00482             return status
00483         currentSequences = []
00484         if seqNumber != None:
00485             # Add only the requested sequence
00486             currentSequences.append(seqNumber)
00487         else:
00488             # Loop over all sequences for this run
00489             currentSequences += self.sequencesInRun(runNumber)
00490         print "Add Sequence Stats: %s_%07d: %s" % (jobName, runNumber,
00491                                                    str(currentSequences))
00492         import os
00493         statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber)
00494         statsTempFile = "%s.tmp.%d" % (statsSumFile, os.getpid())
00495         # Loop over sequences ROOT output, and add to run sum
00496         status=Status.SUCCESS
00497         for sequence in currentSequences:
00498             seqState = self.getState( JobType.ADDSTATS, jobName, runNumber, sequence)
00499             if seqState.state==State.STATS_DONE:
00500                 # Skipping processed sequence
00501                 print "AddStats: Run %d seq %d already added." % (runNumber, sequence)
00502                 continue
00503             if seqState.state==State.STATS_FAILED:
00504                 # Skipping problematic sequence
00505                 print "AddStats: Run %d seq %d has problems." % (runNumber, sequence)
00506                 continue
00507             self.setState( State.STATS_ADDING, JobType.ADDSTATS, jobName, runNumber,
00508                            sequence)
00509             command = None
00510             statsSeqFile = self.siteConfig.statsSeqFile(jobName, runNumber,
00511                                                         sequence)
00512             if os.path.isfile(statsSumFile):
00513                 command = '$RUNDIAGNOSTICSROOT/share/mergeFiles.py -o %(statsTempFile)s %(statsSumFile)s %(statsSeqFile)s' % {
00514                     "statsTempFile":statsTempFile,
00515                     "statsSumFile":statsSumFile,
00516                     "statsSeqFile":statsSeqFile
00517                     }
00518             else:
00519                 command = "cp %s %s" % (statsSeqFile, statsTempFile)
00520             # Run Command
00521             # Set job status to processing
00522             status = self.runCommand( command )
00523             if status!=Status.SUCCESS:
00524                 self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName, runNumber,
00525                                sequence)
00526                 break
00527             # Move temp file to replace total
00528             command = "mv %s %s" % (statsTempFile, statsSumFile)
00529             status = self.runCommand( command )
00530             if status!=Status.SUCCESS:
00531                 self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName, runNumber,
00532                                sequence)
00533                 break
00534             self.setState( State.STATS_DONE, JobType.ADDSTATS, jobName, runNumber,
00535                            sequence)
00536         if os.path.isfile(statsTempFile):
00537             command = "rm %s" % (statsTempFile)
00538             self.runCommand( command )
00539         self.unlockStats(jobName, runNumber)
00540         return status

def Manager::Manager::dumpPmt (   self,
  jobName,
  runNumber,
  seqNumber 
)

dump pmt info to DB##########################

dump pmt info to DB

Definition at line 542 of file Manager.py.

00543                                                     :
00544         """dump pmt info to DB"""
00545         #status = self.lockStats(jobName, runNumber)
00546         #if status!=Status.SUCCESS:
00547         #    self.setState( State.DQDUMP_PMTFAILED, JobType.DQDUMP, jobName, runNumber,
00548         #                   seqNumber)
00549         #    return status
00550         currentSequences = []
00551         if seqNumber != None:
00552             # Add only the requested sequence
00553             currentSequences.append(seqNumber)
00554         else:
00555             # Loop over all sequences for this run
00556             currentSequences += self.sequencesInRun(runNumber)
00557         print "Dump seq pmt DQ info: %s_%07d: %s" % (jobName, runNumber,
00558                                                    str(currentSequences))
00559         import os
00560         # Loop over sequences ROOT output, and add to run sum
00561         status=Status.SUCCESS
00562         for sequence in currentSequences:
00563             seqState = self.getState( JobType.DQDUMP, jobName, runNumber, sequence)
00564             if seqState.state==State.DQDUMP_PMTDONE:
00565                 # Skipping processed sequence
00566                 print "DQDUMP: Run %d seq %d already dumped to DB." % (runNumber, sequence)
00567                 continue
00568             if seqState.state==State.DQDUMP_PMTFAILED:
00569                 # Skipping problematic sequence
00570                 print "DQDUMP: Run %d seq %d has problems." % (runNumber, sequence)
00571                 continue
00572             self.setState( State.DQDUMP_PMTPROCESSING, JobType.DQDUMP, jobName, runNumber,
00573                            sequence)
00574             command = None
00575             statsSeqFile = self.siteConfig.statsSeqFile(jobName, runNumber,
00576                                                         sequence)
00577             if os.path.isfile(statsSeqFile):
00578                 command = '$DQDUMPROOT/share/DumpPmt.py %(statsSeqFile)s' % {
00579                     "statsSeqFile":statsSeqFile
00580                     }
00581             # Run Command
00582             # Set job status to processing
00583             status = self.runCommand( command )
00584             if status!=Status.SUCCESS:
00585                 self.setState( State.DQDUMP_PMTFAILED, JobType.DQDUMP, jobName, runNumber,
00586                                sequence)
00587                 continue 
00588             self.setState( State.DQDUMP_PMTDONE, JobType.DQDUMP, jobName, runNumber,
00589                            sequence)
00590             #self.unlockStats(jobName, runNumber)
00591         return status

def Manager::Manager::dumpTrigger (   self,
  jobName,
  runNumber,
  seqNumber 
)

dump trigger info to DB##########################

dump trigger info to DB

Definition at line 593 of file Manager.py.

00594                                                         :
00595         """dump trigger info to DB"""
00596         #status = self.lockStats(jobName, runNumber)
00597         #if status!=Status.SUCCESS:
00598         #    self.setState( State.DQDUMP_TRIGGERFAILED, JobType.DQDUMP, jobName, runNumber,
00599         #                   seqNumber)
00600         #    return status
00601         currentSequences = []
00602         if seqNumber != None:
00603             # Add only the requested sequence
00604             currentSequences.append(seqNumber)
00605         else:
00606             # Loop over all sequences for this run
00607             currentSequences += self.sequencesInRun(runNumber)
00608         print "Dump seq pmt DQ info: %s_%07d: %s" % (jobName, runNumber,
00609                                                    str(currentSequences))
00610         import os
00611         # Loop over sequences ROOT output, and add to run sum
00612         status=Status.SUCCESS
00613         for sequence in currentSequences:
00614             seqState = self.getState( JobType.DQDUMP, jobName, runNumber, sequence)
00615             if seqState.state==State.DQDUMP_TRIGGERDONE:
00616                 # Skipping processed sequence
00617                 print "DQDUMP: Run %d seq %d already dumped to DB." % (runNumber, sequence)
00618                 continue
00619             if seqState.state==State.DQDUMP_TRIGGERFAILED:
00620                 # Skipping problematic sequence
00621                 print "DQDUMP: Run %d seq %d has problems." % (runNumber, sequence)
00622                 continue
00623             self.setState( State.DQDUMP_TRIGGERPROCESSING, JobType.DQDUMP, jobName, runNumber,
00624                            sequence)
00625             command = None
00626             statsSeqFile = self.siteConfig.statsSeqFile(jobName, runNumber,
00627                                                         sequence)
00628             if os.path.isfile(statsSeqFile):
00629                 command = '$DQDUMPROOT/share/DumpTriggerInfo.py %(statsSeqFile)s' % {
00630                     "statsSeqFile":statsSeqFile
00631                     }
00632             # Run Command
00633             # Set job status to processing
00634             status = self.runCommand( command )
00635             if status!=Status.SUCCESS:
00636                 self.setState( State.DQDUMP_TRIGGERFAILED, JobType.DQDUMP, jobName, runNumber,
00637                                sequence)
00638                 continue
00639             self.setState( State.DQDUMP_TRIGGERDONE, JobType.DQDUMP, jobName, runNumber,
00640                            sequence)
00641             #self.unlockStats(jobName, runNumber)
00642         return status
   
def Manager::Manager::dumpLivetime (   self,
  jobName,
  fileName,
  runNumber,
  seqNumber 
)

dump livetime info to DB##########################

dump livetime info to DB

Definition at line 644 of file Manager.py.

00645                                                                    :
00646         """dump livetime info to DB"""
00647         #status = self.lockStats(jobName, runNumber)
00648         #if status!=Status.SUCCESS:
00649         #    self.setState( State.DQDUMP_TRIGGERFAILED, JobType.DQDUMP, jobName, runNumber,
00650         #                   seqNumber)
00651         #    return status
00652         currentSequences = []
00653         if seqNumber != None:
00654             # Add only the requested sequence
00655             currentSequences.append(seqNumber)
00656         else:
00657             # Loop over all sequences for this run
00658             currentSequences += self.sequencesInRun(runNumber)
00659         print "Dump seq DQ info: %s_%07d: %s" % (jobName, runNumber,
00660                                                    str(currentSequences))
00661         import os
00662         # Loop over sequences ROOT output, and add to run sum
00663         status=Status.SUCCESS
00664         for sequence in currentSequences:
00665             seqState = self.getState( JobType.DQDUMP, jobName, runNumber, sequence)
00666             if seqState.state==State.DQDUMP_DONE:
00667                 # Skipping processed sequence
00668                 print "DQDUMP: Run %d seq %d already dumped to DB." % (runNumber, sequence)
00669                 continue
00670             if seqState.state==State.DQDUMP_FAILED:
00671                 # Skipping problematic sequence
00672                 print "DQDUMP: Run %d seq %d has problems." % (runNumber, sequence)
00673                 continue
00674             self.setState( State.DQDUMP_PROCESSING, JobType.DQDUMP, jobName, runNumber,
00675                            sequence)
00676             command = None
00677             reconFile = self.siteConfig.reconFile(jobName, fileName, runNumber, sequence)
00678             print reconFile
00679             if os.path.isfile(reconFile):
00680                 command = '$DQDUMPROOT/share/DumpDqLivetime.py %(reconFile)s' % {
00681                     "reconFile":reconFile
00682                     }
00683             # Run Command
00684             # Set job status to processing
00685             status = self.runCommand( command )
00686             if status!=Status.SUCCESS:
00687                 self.setState( State.DQDUMP_FAILED, JobType.DQDUMP, jobName, runNumber,
00688                                sequence)
00689                 continue 
00690             self.setState( State.DQDUMP_DONE, JobType.DQDUMP, jobName, runNumber,
00691                            sequence)
00692             #self.unlockStats(jobName, runNumber)
00693         return status
    
def Manager::Manager::dumpDqAll (   self,
  jobName,
  runNumber,
  seqNumber 
)

dump dq info to DB##########################

dump trigger info to DB

Definition at line 695 of file Manager.py.

00696                                                       :
00697         """dump trigger info to DB"""
00698         #status = self.lockStats(jobName, runNumber)
00699         #if status!=Status.SUCCESS:
00700         #    self.setState( State.DQDUMP_TRIGGERFAILED, JobType.DQDUMP, jobName, runNumber,
00701         #                   seqNumber)
00702         #    return status
00703         currentSequences = []
00704         if seqNumber != None:
00705             # Add only the requested sequence
00706             currentSequences.append(seqNumber)
00707         else:
00708             # Loop over all sequences for this run
00709             currentSequences += self.sequencesInRun(runNumber)
00710         print "Dump seq DQ info: %s_%07d: %s" % (jobName, runNumber,
00711                                                    str(currentSequences))
00712         import os
00713         # Loop over sequences ROOT output, and add to run sum
00714         status=Status.SUCCESS
00715         for sequence in currentSequences:
00716             seqState = self.getState( JobType.DQDUMP, jobName, runNumber, sequence)
00717             if seqState.state==State.DQDUMP_DONE:
00718                 # Skipping processed sequence
00719                 print "DQDUMP: Run %d seq %d already dumped to DB." % (runNumber, sequence)
00720                 continue
00721             if seqState.state==State.DQDUMP_FAILED:
00722                 # Skipping problematic sequence
00723                 print "DQDUMP: Run %d seq %d has problems." % (runNumber, sequence)
00724                 continue
00725             self.setState( State.DQDUMP_PROCESSING, JobType.DQDUMP, jobName, runNumber,
00726                            sequence)
00727             command = None
00728             statsSeqFile = self.siteConfig.statsSeqFile(jobName, runNumber,
00729                                                         sequence)
00730             if os.path.isfile(statsSeqFile):
00731                 command = '$DQDUMPROOT/share/DumpDqAll.py %(statsSeqFile)s' % {
00732                     "statsSeqFile":statsSeqFile
00733                     }
00734             # Run Command
00735             # Set job status to processing
00736             status = self.runCommand( command )
00737             if status!=Status.SUCCESS:
00738                 self.setState( State.DQDUMP_FAILED, JobType.DQDUMP, jobName, runNumber,
00739                                sequence)
00740                 continue 
00741             self.setState( State.DQDUMP_DONE, JobType.DQDUMP, jobName, runNumber,
00742                            sequence)
00743             #self.unlockStats(jobName, runNumber)
00744         return status
   
def Manager::Manager::dumpRate (   self,
  jobName,
  runNumber 
)

run another script to get rate from file ############

calculate event rate for this run

Definition at line 746 of file Manager.py.

00747                                           :
00748         """calculate event rate for this run"""
00749         #status = self.lockStats(jobName, runNumber)
00750         #if status!=Status.SUCCESS:
00751         #    self.setState( State.DQDUMP_RATEFAILED, JobType.DQDUMP, jobName, runNumber )
00752         #    return status
00753         statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber)
00754         import os
00755         command = None
00756         if os.path.isfile(statsSumFile):
00757             command = "$DQDUMPROOT/share/DumpRate.py %s" % statsSumFile
00758         status = self.runCommand( command )
00759         if status != Status.SUCCESS:
00760             self.setState( State.DQDUMP_RATEFAILED, JobType.DQDUMP, jobName, runNumber )
00761             return status
00762         self.setState( State.DQDUMP_RATEDONE, JobType.DQDUMP, jobName, runNumber )
00763         #self.unlockStats(jobName, runNumber)
        return status
def Manager::Manager::generateFigures (   self,
  jobName,
  runNumber 
)
Generate figures for this run

Definition at line 766 of file Manager.py.

00767                                                  :
00768         """Generate figures for this run"""
00769         print "Making figures: %s_%07d" % (jobName, runNumber)
00770         if self.opts.add_stats:
00771             # Add a short break to allow other add-stats jobs to proceed
00772             import time
00773             time.sleep(10)
00774         requiredDirectories = [ self.siteConfig.figuresDirectory(runNumber) ]
00775         status = self.ensureDirectories( requiredDirectories )
00776         if status!=Status.SUCCESS:
00777             return status
00778         if self.statsLocked(jobName, runNumber):
00779             # Postpone figure generation for a later time
00780             print "Postponing figure generation"
00781             return Status.SUCCESS
00782         status = self.lockStats(jobName, runNumber)
00783         if status!=Status.SUCCESS:
00784             return status
00785         self.setState( State.SUMMARY_PRINTING, JobType.SUMMARY, jobName, runNumber)
00786         statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber)
00787         figuresDir = self.siteConfig.figuresDirectory(runNumber)
00788         command = "$RUNDIAGNOSTICSROOT/share/figures.py -o %(figuresDir)s -r stats/diagnostics/run_%(runNumber)07d %(statsSumFile)s" % {
00789             "figuresDir":figuresDir,
00790             "runNumber":runNumber,
00791             "statsSumFile":statsSumFile
00792             }
00793         # Run Command
00794         status = self.runCommand( command )
00795         if status!=Status.SUCCESS:
00796             self.setState( State.SUMMARY_FAILED_PRINTING, JobType.SUMMARY,
00797                            jobName, runNumber)
00798             status = self.unlockStats(jobName, runNumber)
00799             return status
00800         self.setState( State.SUMMARY_PRINTED, JobType.SUMMARY, jobName, runNumber)
00801         status = self.unlockStats(jobName, runNumber)
00802         return status

def Manager::Manager::updateRunIndex (   self,
  jobName,
  runNumber 
)
Update/Create XML index for this run, with results from Job Name

Definition at line 803 of file Manager.py.

00804                                                 :
00805         """Update/Create XML index for this run, with results from Job Name"""
00806         import os
00807         print "Updating Run XML: %s_%07d" % (jobName, runNumber)
00808         requiredDirectories = [ self.siteConfig.xmlDirectory(runNumber) ]
00809         status = self.ensureDirectories( requiredDirectories )
00810         if status!=Status.SUCCESS:
00811             return status        
00812         if self.statsLocked(jobName, runNumber) or self.runLocked(runNumber):
00813             # Postpone run summary for a later time
00814             print "Postponing update of run index"
00815             return Status.SUCCESS
00816         status = self.lockStats(jobName, runNumber)
00817         if status!=Status.SUCCESS:
00818             return status
00819         status = self.lockRun(runNumber)
00820         if status!=Status.SUCCESS:
00821             self.unlockStats(jobName, runNumber)
00822             return status
00823         statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber)
00824         statsInstallPath = self.siteConfig.statsInstallPath(jobName, runNumber)
00825         xmlIndexFile = self.siteConfig.xmlIndexFile(runNumber)
00826         indexTempFile = "%s.tmp.%d" % (xmlIndexFile, os.getpid())
00827         figPathReplace = self.siteConfig.figuresPathReplace(runNumber)
00828         command = "$RUNDIAGNOSTICSROOT/share/xmlIndex.py -f %(statsSumFile)s -o %(indexTempFile)s -p %(statsInstallPath)s -r %(figPathReplace)s" % {
00829             "statsSumFile":statsSumFile,
00830             "indexTempFile":indexTempFile,
00831             "statsInstallPath":statsInstallPath,
00832             "figPathReplace":figPathReplace
00833             }
00834         # Run Command
00835         self.setState( State.SUMMARY_INDEXING, JobType.SUMMARY, jobName, runNumber)
00836         status = self.runCommand( command )
00837         if status!=Status.SUCCESS:
00838             self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY,
00839                            jobName, runNumber)
00840             self.unlockStats(jobName, runNumber)
00841             self.unlockRun(runNumber)
00842             return status
00843         import os
00844         if os.path.isfile(xmlIndexFile):
00845             # Merge new results into current index
00846             mergedFile = "%s.merge" % indexTempFile
00847             command = "$RUNDIAGNOSTICSROOT/share/xmlIndex.py -o %(mergedFile)s -i %(xmlIndexFile)s -u %(indexTempFile)s" % {
00848             "mergedFile":mergedFile,
00849             "xmlIndexFile":xmlIndexFile,
00850             "indexTempFile":indexTempFile
00851             }
00852             status = self.runCommand( command )
00853             if status!=Status.SUCCESS:
00854                 self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY,
00855                                jobName, runNumber)
00856                 self.unlockStats(jobName, runNumber)
00857                 self.unlockRun(runNumber)
00858                 return status
00859             # Move temp file to replace index
00860             command = "mv %s %s" % (mergedFile, indexTempFile)
00861             status = self.runCommand( command )
00862             if status!=Status.SUCCESS:
00863                 self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY,
00864                                jobName, runNumber)
00865                 self.unlockStats(jobName, runNumber)
00866                 self.unlockRun(runNumber)
00867                 return status
00868         # Move temp file to replace index
00869         command = "mv %s %s" % (indexTempFile, xmlIndexFile)
00870         status = self.runCommand( command )
00871         if status!=Status.SUCCESS:
00872             self.setState( State.SUMMARY_FAILED_INDEXING, JobType.SUMMARY,
00873                            jobName, runNumber)
00874             self.unlockStats(jobName, runNumber)
00875             self.unlockRun(runNumber)
00876             return status        
00877         self.setState( State.SUMMARY_DONE, JobType.SUMMARY, jobName, runNumber)
00878         self.unlockStats(jobName, runNumber)
00879         self.unlockRun(runNumber)
00880         return Status.SUCCESS

def Manager::Manager::updateFullIndex (   self,
  runNumber 
)
Make sure this run is present/added to index of all runs

Definition at line 881 of file Manager.py.

00882                                         :
00883         """Make sure this run is present/added to index of all runs"""
00884         # Add/Verify Run is in index of all runs
00885         import os
00886         print "Updating Run List XML: %07d" % (runNumber)
00887         status = self.lockRunIndex()
00888         if status!=Status.SUCCESS:
00889             self.unlockRun(runNumber)
00890             return status
00891         xmlIndexFile = self.siteConfig.xmlIndexFile(runNumber)
00892         mainIndexFile = self.siteConfig.runIndexFile()
00893         mainFileDir = self.siteConfig.runIndexDir()
00894         mainTempFile = "%s.tmp.%d" % (mainIndexFile, os.getpid())
00895         xmlRelativeDir = self.siteConfig.xmlRelativeDir(runNumber)
00896         requiredFiles = ['runs.xml','index.html','runs.css','runs.js']
00897         for filename in requiredFiles:
00898             fullFilePath = mainFileDir+ '/' + filename
00899             if not os.path.isfile(fullFilePath):
00900                 # Make main index file
00901                 command = "cp $RUNDIAGNOSTICSROOT/output/%s %s" % (
00902                     filename,
00903                     fullFilePath
00904                     )
00905                 status = self.runCommand( command )
00906                 if status!=Status.SUCCESS:
00907                     self.unlockRunIndex()
00908                     return status
00909         # Run Command
00910         command = "$RUNDIAGNOSTICSROOT/share/xmlCollect.py -o %(mainTempFile)s -p %(xmlRelativeDir)s %(xmlIndexFile)s  %(mainIndexFile)s" % {
00911             "mainTempFile":mainTempFile,
00912             "xmlIndexFile":xmlIndexFile,
00913             "mainIndexFile":mainIndexFile,
00914             "xmlRelativeDir":xmlRelativeDir
00915             }
00916         status = self.runCommand( command )
00917         if status!=Status.SUCCESS:
00918             self.unlockRunIndex()
00919             return status
00920         command = "mv %s %s" % (mainTempFile, mainIndexFile)
00921         status = self.runCommand( command )
00922         if status!=Status.SUCCESS:
00923             self.unlockRunIndex()
00924             return status
00925         self.unlockRunIndex()
00926         return Status.SUCCESS

def Manager::Manager::clearSequence (   self,
  jobName,
  runNumber,
  seqNumber 
)
Clear the job result and state to allow reprocessing

Definition at line 927 of file Manager.py.

00928                                                           :
00929         """Clear the job result and state to allow reprocessing"""
00930         import os
00931         statsSeqFile = self.siteConfig.statsSeqFile(jobName, runNumber,
00932                                                     seqNumber)
00933         if os.path.isfile(statsSeqFile):
00934             command = "rm -f %s" % statsSeqFile
00935             status = self.runCommand( command )
00936             if status!=Status.SUCCESS:
00937                 self.setState( State.STATS_FAILED, JobType.ADDSTATS, jobName,
00938                               runNumber, seqNumber) 
00939                 return status
00940         return self.setState( State.RUN_READY, JobType.RUN, jobName, runNumber,
00941                               seqNumber) 

def Manager::Manager::clearSequenceStats (   self,
  jobName,
  runNumber,
  seqNumber 
)
Clear the job state to allow reprocessing

Definition at line 942 of file Manager.py.

00943                                                                :
00944         """Clear the job state to allow reprocessing"""
00945         return self.setState( State.STATS_READY, JobType.ADDSTATS, jobName,
00946                               runNumber, seqNumber) 

def Manager::Manager::clearSequenceDumpPmt (   self,
  jobName,
  runNumber,
  seqNumber 
)
Clear the DQ dump pmt job state to allow reprocessing

Definition at line 947 of file Manager.py.

00948                                                                  :
00949         """Clear the DQ dump pmt job state to allow reprocessing"""
00950         return self.setState( State.DQDUMP_READY, JobType.DQDUMP, jobName,
00951                               runNumber, seqNumber)

def Manager::Manager::clearSequenceDumpTrigger (   self,
  jobName,
  runNumber,
  seqNumber 
)
Clear the DQ dump trigger job state to allow reprocessing

Definition at line 952 of file Manager.py.

00953                                                                      :
00954         """Clear the DQ dump trigger job state to allow reprocessing"""
00955         return self.setState( State.DQDUMP_READY, JobType.DQDUMP, jobName,
00956                               runNumber, seqNumber)

def Manager::Manager::clearSequenceDumpLivetime (   self,
  jobName,
  runNumber,
  seqNumber 
)
Clear the DQ dump livetime job state to allow reprocessing

Definition at line 957 of file Manager.py.

00958                                                                       :
00959         """Clear the DQ dump livetime job state to allow reprocessing"""
00960         return self.setState( State.DQDUMP_READY, JobType.DQDUMP, jobName,
00961                               runNumber, seqNumber)

def Manager::Manager::clearSequenceDumpDqAll (   self,
  jobName,
  runNumber,
  seqNumber 
)
Clear the DQ dump trigger job state to allow reprocessing

Definition at line 962 of file Manager.py.

00963                                                                    :
00964         """Clear the DQ dump trigger job state to allow reprocessing"""
00965         return self.setState( State.DQDUMP_READY, JobType.DQDUMP, jobName,
00966                               runNumber, seqNumber)

def Manager::Manager::clearSummary (   self,
  jobName,
  runNumber 
)
Clear the job summary and state to allow reprocessing

Definition at line 967 of file Manager.py.

00968                                               :
00969         """Clear the job summary and state to allow reprocessing"""
00970         import os
00971         statsSumFile = self.siteConfig.statsSumFile(jobName, runNumber)
00972         if os.path.isfile(statsSumFile):
00973             command = "rm -f %s" % statsSumFile
00974             status = self.runCommand( command )
00975             if status!=Status.SUCCESS:
00976                 return status
00977         return self.setState( State.SUMMARY_READY, JobType.ADDSTATS, jobName,
00978                               runNumber)

def Manager::Manager::printState (   self,
  jobName,
  runNumber,
  seqNumber 
)
Print State information for this job/run/sequence

Definition at line 979 of file Manager.py.

00980                                                        :
00981         """Print State information for this job/run/sequence"""
00982         summaryState = self.getState(JobType.SUMMARY, jobName, runNumber)
00983         print "Run %07d: %s" % (runNumber, State.getName(summaryState.state))
00984         currentSequences = []
00985         if seqNumber != None:
00986             # Add only the requested sequence
00987             currentSequences.append(seqNumber)
00988         else:
00989             # Loop over all sequences for this run
00990             currentSequences += self.sequencesInRun(runNumber)
00991         for sequence in currentSequences:
00992             runState = self.getState(JobType.RUN, jobName, runNumber, sequence)
00993             statsState = self.getState(JobType.ADDSTATS, jobName, runNumber,
00994                                        sequence)
00995             print " Seq %04d: %s %s" % (sequence,
00996                                         State.getName(runState.state),
00997                                         State.getName(statsState.state)) 
00998         return Status.SUCCESS

def Manager::Manager::resolveRunSequenceFile (   self,
  runNumber,
  seqNumber,
  fileName 
)
Resolve the Run Number, Sequence Number, and Filename,
given a subset of the information.

Definition at line 999 of file Manager.py.

01000                                                                     :
01001         """Resolve the Run Number, Sequence Number, and Filename,
01002         given a subset of the information.
01003         """
01004         badResolve = [Status.FAILURE, None, None, None]
01005         if fileName!=None:
01006             # Check if filename is consistent with supplied run/seq numbers
01007             fileDesc = FileDescription(fileName)
01008             if not fileDesc.isValid:
01009                 print "Invalid file: ",fileName
01010                 return badResolve
01011             if runNumber!=None and runNumber!=fileDesc.runNumber:
01012                 print "Error: runNumber %d invalid for file %s"
01013                 return badResolve
01014             if seqNumber!=None and seqNumber!=fileDesc.sequence:
01015                 print "Error: runNumber %d invalid for file %s"
01016                 return badResolve
01017             return [Status.SUCCESS, fileDesc.runNumber,
01018                     fileDesc.sequence, fileName] 
01019         else:
01020             # Lookup filename from catalog
01021             if runNumber==None or seqNumber==None:
01022                 print "Error: Either Filename or Run/Seq must be specified."
01023                 return badResolve
01024             fileList = self.siteConfig.filesInRun(self.opts.run_number)
01025             for fname in fileList:
01026                 fileDesc = FileDescription(fname)
01027                 if not fileDesc.isValid:
01028                     print "Skipping invalid file from Catalog: ",fname
01029                     continue
01030                 if fileDesc.sequence==seqNumber:
01031                     # Found a file match for run/seq number
01032                     return [Status.SUCCESS, runNumber, seqNumber, fname]
01033         return badResolve

def Manager::Manager::makeJobCommand (   self,
  jobName,
  runNumber,
  seqNumber,
  fileName 
)
Make the command to be run for this job 

Definition at line 1034 of file Manager.py.

01035                                                                      :
01036         """ Make the command to be run for this job """
01037         # Prepare Job Command
01038         commandArgs = {}
01039         commandArgs["dataFile"] = fileName
01040         commandArgs["outLog"] = self.siteConfig.logOutFile(jobName,
01041                                                            runNumber,
01042                                                            seqNumber)
01043         commandArgs["errLog"] = self.siteConfig.logErrFile(jobName,
01044                                                            runNumber,
01045                                                            seqNumber)
01046         commandArgs["nuwaDataFile"] = self.siteConfig.nuwaDataFile(fileName,
01047                                                                    runNumber,
01048                                                                    seqNumber)
01049         commandArgs["reconFile"] = self.siteConfig.reconFile(jobName,
01050                                                              fileName,
01051                                                              runNumber,
01052                                                              seqNumber)
01053         statsFile = None
01054         if self.jobConfig.hasStats:
01055             statsFile = self.siteConfig.statsSeqFile(jobName, runNumber,
01056                                                      seqNumber)
01057             commandArgs["statsFile"] = statsFile
01058         command = self.jobConfig.command % commandArgs
01059         return command

def Manager::Manager::ensureDirectories (   self,
  directories 
)
Check if directories exist, and make if necessary

Definition at line 1060 of file Manager.py.

01061                                             :
01062         """Check if directories exist, and make if necessary"""
01063         import os
01064         for directory in directories:
01065             if not os.path.exists(directory):
01066                 command = "mkdir -p %s" % directory
01067                 status = self.runCommand( command )
01068                 if status!=Status.SUCCESS:
01069                     return status
01070         return Status.SUCCESS        

def Manager::Manager::setLock (   self,
  lockFile 
)

Functions for handling job locks #################################.

Request a lockfile, and wait for lock 

Definition at line 1073 of file Manager.py.

01074                                :
01075         """ Request a lockfile, and wait for lock """
01076         command = "lockfile %s" % lockFile
01077         return self.runCommand(command)

def Manager::Manager::isLocked (   self,
  lockFile 
)
Check whether a lockfile exists 

Definition at line 1078 of file Manager.py.

01079                                 :
01080         """ Check whether a lockfile exists """
01081         import os
01082         return os.path.isfile(lockFile)

def Manager::Manager::removeLock (   self,
  lockFile 
)
Remove a lockfile 

Definition at line 1083 of file Manager.py.

01084                                   :
01085         """ Remove a lockfile """
01086         command = "rm -f %s" % lockFile
01087         return self.runCommand(command)

def Manager::Manager::lockStats (   self,
  jobName,
  runNumber 
)
Lock statistics 

Definition at line 1088 of file Manager.py.

01089                                            :
01090         """ Lock statistics """
01091         statsLockFile = self.siteConfig.statsLockFile(jobName, runNumber)
01092         return self.setLock( statsLockFile )

def Manager::Manager::statsLocked (   self,
  jobName,
  runNumber 
)
Are statistics locked? 

Definition at line 1093 of file Manager.py.

01094                                              :
01095         """ Are statistics locked? """
01096         statsLockFile = self.siteConfig.statsLockFile(jobName, runNumber)
01097         return self.isLocked( statsLockFile )

def Manager::Manager::unlockStats (   self,
  jobName,
  runNumber 
)
Unlock statistics 

Definition at line 1098 of file Manager.py.

01099                                              :
01100         """ Unlock statistics """
01101         statsLockFile = self.siteConfig.statsLockFile(jobName, runNumber)
01102         return self.removeLock( statsLockFile )

def Manager::Manager::lockRun (   self,
  runNumber 
)
Lock run summary 

Definition at line 1103 of file Manager.py.

01104                                 :
01105         """ Lock run summary """
01106         runLockFile = self.siteConfig.runLockFile(runNumber)
01107         return self.setLock( runLockFile )

def Manager::Manager::runLocked (   self,
  runNumber 
)
Is run summary locked? 

Definition at line 1108 of file Manager.py.

01109                                   :
01110         """ Is run summary locked? """
01111         runLockFile = self.siteConfig.runLockFile(runNumber)
01112         return self.isLocked( runLockFile )

def Manager::Manager::unlockRun (   self,
  runNumber 
)
Unlock run summary 

Definition at line 1113 of file Manager.py.

01114                                   :
01115         """ Unlock run summary """
01116         runLockFile = self.siteConfig.runLockFile(runNumber)
01117         return self.removeLock( runLockFile )

def Manager::Manager::lockRunIndex (   self)
Lock index of runs 

Definition at line 1118 of file Manager.py.

01119                           :
01120         """ Lock index of runs """
01121         lockFile = self.siteConfig.runIndexLockFile()
01122         return self.setLock( lockFile )

def Manager::Manager::runIndexLocked (   self)
Is run summary locked? 

Definition at line 1123 of file Manager.py.

01124                             :
01125         """ Is run summary locked? """
01126         lockFile = self.siteConfig.runIndexLockFile()
01127         return self.isLocked( lockFile )
    
def Manager::Manager::unlockRunIndex (   self)
Unlock run summary 

Definition at line 1128 of file Manager.py.

01129                             :
01130         """ Unlock run summary """
01131         lockFile = self.siteConfig.runIndexLockFile()
01132         return self.removeLock( lockFile )

def Manager::Manager::getState (   self,
  jobType,
  jobName,
  runNumber,
  seqNumber = None 
)

Functions for handling job state #################################.

Check the state of a job 

Definition at line 1135 of file Manager.py.

01136                                                                    :
01137         """ Check the state of a job """
01138         import os
01139         stateFile = self.siteConfig.stateFile(jobType, jobName, runNumber, seqNumber)
01140         if not os.path.isfile( stateFile ):
01141             # Job is unknown
01142             return JobState()
01143         return self.getStateFromFile( stateFile )

def Manager::Manager::setState (   self,
  newState,
  jobType,
  jobName,
  runNumber,
  seqNumber = None 
)
Set the state of a job 

Definition at line 1144 of file Manager.py.

01145                                                                              :
01146         """ Set the state of a job """
01147         stateFile = self.siteConfig.stateFile(jobType, jobName, runNumber, seqNumber)
01148         return self.setStateInFile(stateFile, newState)

def Manager::Manager::setStateInFile (   self,
  stateFile,
  newState 
)
Set the job state in the given state file

Definition at line 1149 of file Manager.py.

01150                                                  :
01151         """Set the job state in the given state file"""
01152         stateName = State.getName( newState )
01153         import time, datetime
01154         stateTimeStr = time.ctime() # Note: this is localtime
01155         stateLogStr = self.stateSep.join([stateName, stateTimeStr])
01156         import os
01157         status = self.ensureDirectories( [os.path.dirname( stateFile )] )
01158         if status != Status.SUCCESS:
01159             return status
01160         command = "echo '%s' >> %s" % (stateLogStr, stateFile)
01161         return self.runCommand( command )

def Manager::Manager::getStateFromFile (   self,
  stateFile 
)
Get state from file (Internal function) 

Definition at line 1162 of file Manager.py.

01163                                          :
01164         """ Get state from file (Internal function) """
01165         lastLine = self.lastLine(stateFile)
01166         if len(lastLine)==0:
01167             return JobState()
01168         stateParts = lastLine.split(self.stateSep)
01169         if len(stateParts) != self.stateNParts:
01170             return JobState()
01171         # State
01172         stateStr = stateParts[0].strip()
01173         state = State.getStateByName(stateStr)
01174         # State time
01175         stateTimeStr = stateParts[1].strip()
01176         import time
01177         ttuple = time.strptime( stateTimeStr )
01178         stateTime = time.mktime( ttuple )
01179         return JobState(state, stateTime)

def Manager::Manager::lastLine (   self,
  filename 
)
Get the last line from a file

Definition at line 1180 of file Manager.py.

01181                                 :
01182         """Get the last line from a file"""
01183         import subprocess
01184         tailProc = subprocess.Popen("tail -n 1 %s" % filename,
01185                                     stdout=subprocess.PIPE,
01186                                     shell=True)
01187         (out, err) = tailProc.communicate()
01188         status = tailProc.wait()
01189         if status!=Status.SUCCESS or type(out)!=str:
01190             return ""
01191         return out.strip()
01192     

Member Data Documentation

Definition at line 16 of file Manager.py.

Definition at line 16 of file Manager.py.

Definition at line 16 of file Manager.py.

Definition at line 16 of file Manager.py.

Definition at line 25 of file Manager.py.

Definition at line 128 of file Manager.py.

Definition at line 128 of file Manager.py.

Definition at line 128 of file Manager.py.


The documentation for this class was generated from the following file:
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 09:54:04 for ProcessManager by doxygen 1.7.4