/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

fileinfo.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import logging, time
00003 from datetime import datetime, timedelta
00004 from baseinfo import BaseInfo
00005 log = logging.getLogger(__name__)
00006 
00007 class DaqFiles(dict):
00008     def __init__(self, source , cfg={} ):
00009         """
00010         Queries ``SFO_TZ_FILE`` to find files with records that have not yet been propagated offline
00011 
00012         """
00013         kstf = source.kls_("SFO_TZ_FILE")
00014         def _kstf__repr__(self):
00015             return "%s %s %s %s" % ( self.__class__.__name__, self.RUNNR, self.OPENTIME, self.LFN )
00016         kstf.__repr__ = _kstf__repr__
00017  
00018         self.source = source
00019         self.cfg = cfg 
00020         self.kstf = kstf
00021 
00022         daq = self.source()            ## single session closured in 
00023         self.qdaq  = lambda *qwn:daq.query(*qwn).order_by(kstf.OPENTIME).filter(kstf.FILESTATE == 'CLOSED')
00024         self.qdaqr = lambda *qwn:daq.query(*qwn).order_by(kstf.OPENTIME.desc()).filter(kstf.FILESTATE == 'CLOSED')
00025 
00026     def range(self):
00027         """
00028         DAQ Query recording `first` and `last` closed entries, in OPENTIME order 
00029         """
00030         self.first = self._first()
00031         self.last  = self._last()
00032         log.info("first %s " % self.first )
00033         log.info("last  %s " % self.last  )
00034 
00035     def runs( self , startlocal ):
00036         """
00037         :param startlocal: local datetime after which are interested in DAQ files
00038 
00039         Return ``RUNNR`` corresponding to files closed after `startlocal` in ``OPENTIME`` order  
00040         """
00041         kstf = self.kstf
00042         maxFilesPerRun = self.cfg.get('maxFilesPerRun',1000)
00043         q = self.qdaq(kstf.RUNNR).filter(kstf.OPENTIME>startlocal).distinct()
00044         daqruns = [run for run, in q.limit(maxFilesPerRun).all()]   
00045         log.info("qfter %s yields %s daqruns " % ( startlocal, len(daqruns) )) 
00046         return daqruns
00047 
00048     def _first(self):
00049         return self.qdaq(self.kstf).first()
00050     def _last(self):
00051         return self.qdaqr(self.kstf).first()
00052 
00053     def newfiles( self , startlocal, offnames=[] ):
00054          """
00055          :param offnames: list of LFN of already collected files
00056          """
00057          kstf = self.kstf
00058          maxFilesPerRun = self.cfg.get('maxFilesPerRun',1000)
00059          daq = self.source()
00060          q = self.qdaq(kstf).filter(kstf.OPENTIME>startlocal)
00061          if len(offnames) > 0:    
00062              daqfiles = q.filter(~kstf.LFN.in_(offnames)).limit(maxFilesPerRun).all()  ## "NOT IN_" 
00063          else:
00064              daqfiles = q.limit(maxFilesPerRun).all()  
00065          return daqfiles
00066 
00067     
00068 
00069 class OffFiles(dict):
00070     def __init__(self, target , cfg ):
00071         self.target = target 
00072         self.cfg = cfg
00073         self.kdrdfi = self.target.dbikls_("DaqRawDataFileInfo")
00074 
00075         def kdrdfi__repr__(self):
00076             return "%s %s %s %s %s %s" % ( self.__class__.__name__, self.runNo, self.fileName, self.TIMESTART, self.TIMEEND, self.SEQNO )
00077         self.kdrdfi.__repr__ = kdrdfi__repr__ 
00078         self.kdrdfi.localstart = property(lambda self:self.TIMESTART - timedelta(seconds=time.timezone))   ## time.timezone is  -28800
00079 
00080     def last(self):
00081         """
00082         Accesses highest SEQNO entry from offline DBI pair `DaqRawDataFileInfo` and updates self with values
00083 
00084         #    self.update(runNo=last.runNo,fileNo=last.fileNo,fileName=last.fileName,timeStart=last.TIMESTART) 
00085         """
00086         off   = self.target()
00087         kdrdfi = self.kdrdfi 
00088 
00089         last = off.query(kdrdfi).order_by(kdrdfi.SEQNO.desc()).first()
00090         self.last = last 
00091         return last 
00092 
00093     def offnames(self, runs=[]):
00094         """
00095         :param runs: list of runs
00096 
00097         offline filenames corresponding to those runs
00098         """
00099         off = self.target()
00100         kdrdfi = self.kdrdfi
00101         if len(runs) == 0:
00102             offnames = []
00103         else: 
00104             offnames = [fileName for fileName, in off.query(kdrdfi.fileName).filter(kdrdfi.runNo.in_(runs)).all()]      
00105         return offnames
00106 
00107 
00108 
00109 class FileInfo(BaseInfo):
00110     def __init__(self, source="tmp_daqdb", target="tmp_offline_db" , cfg={} ):
00111         BaseInfo.__init__(self, source, target, cfg) 
00112         self.daqf = DaqFiles( self.source, cfg  )
00113         self.offf = OffFiles( self.target, cfg  )
00114 
00115     def __repr__(self):
00116         return BaseInfo.__repr__(self)
00117 
00118     def last(self):
00119         return self.offf.last()
00120 
00121     def after(self, startlocal , offset_seconds=None ):
00122         """
00123         Queries daq table SFO_TZ_FILE for closed files, that were opened after cutoff time `startlocal` 
00124         and that are not already in offline table DaqRawDataFileInfo
00125 
00126         Former ``N+1`` nested approach ``get_newFileList``:
00127 
00128         * iterated over DAQ file entries, doing offline queries at each iteration to check for preexisting file names 
00129 
00130         New flat ``3`` query approach:
00131 
00132         #. runs corresponding to new DAQ files
00133         #. offline files names corresponding to those runs
00134         #. DAQ files with names not already in offline 
00135 
00136         Could be done flat ``2``, but that would entail IN querying with all filenames, which is not sustainable.
00137 
00138         """
00139         if not offset_seconds:
00140             offset_seconds = self.cfg.get('file_seconds_offset', -60 )
00141 
00142         startlocal = startlocal + timedelta(seconds=offset_seconds)
00143 
00144         daqruns = self.daqf.runs( startlocal )
00145         log.info("qfter %s yields %s daqruns " % ( startlocal, len(daqruns) )) 
00146 
00147         daqfiles = []
00148         if len(daqruns) == 0:
00149             log.info("no new closed runs after %s " % startlocal )
00150         else:
00151             offnames = self.offf.offnames(daqruns) 
00152             daqfiles = self.daqf.newfiles( startlocal, offnames )
00153         return daqfiles
00154 
00155 
00156 
00157 
00158 
00159 if __name__ == '__main__':
00160 
00161     logging.basicConfig(level=logging.INFO)
00162 
00163     from NonDbi import MetaDB
00164     daqm = MetaDB("tmp_daqdb")
00165     offm = MetaDB("tmp_offline_db")
00166 
00167     daqf = DaqFiles( daqm )
00168     daqf.range()
00169 
00170     finfo = FileInfo(source=daqm,target=offm, cfg={})
00171     print finfo
00172 
00173     last = finfo.last()
00174     print "last_collected %r " % last 
00175     print "localstart %r " % last.localstart 
00176 
00177     parse = lambda _:datetime.strptime(_,"%Y-%m-%d %H:%M:%S")
00178 
00179     #cut = last.localstart
00180     #cut = daqf.first.OPENTIME #+ timedelta(minutes=600)
00181     #cut = parse("2011-10-20 16:35:24")
00182     cut = parse("2011-10-18 10:03:47")
00183 
00184     log.info("cut %s " % cut )
00185     fis = finfo.after( cut , offset_seconds=-60  )
00186 
00187     for sli in slice(0,10), slice(-10,None):
00188         print sli
00189         for fi in fis[sli]:
00190             print fi
00191 
00192 
00193 
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 09:50:03 for Scraper by doxygen 1.7.4