/search.css" rel="stylesheet" type="text/css"/> /search.js">
00001 #!/usr/bin/env python 00002 import logging, time 00003 from datetime import datetime, timedelta 00004 from baseinfo import BaseInfo 00005 log = logging.getLogger(__name__) 00006 00007 class DaqFiles(dict): 00008 def __init__(self, source , cfg={} ): 00009 """ 00010 Queries ``SFO_TZ_FILE`` to find files with records that have not yet been propagated offline 00011 00012 """ 00013 kstf = source.kls_("SFO_TZ_FILE") 00014 def _kstf__repr__(self): 00015 return "%s %s %s %s" % ( self.__class__.__name__, self.RUNNR, self.OPENTIME, self.LFN ) 00016 kstf.__repr__ = _kstf__repr__ 00017 00018 self.source = source 00019 self.cfg = cfg 00020 self.kstf = kstf 00021 00022 daq = self.source() ## single session closured in 00023 self.qdaq = lambda *qwn:daq.query(*qwn).order_by(kstf.OPENTIME).filter(kstf.FILESTATE == 'CLOSED') 00024 self.qdaqr = lambda *qwn:daq.query(*qwn).order_by(kstf.OPENTIME.desc()).filter(kstf.FILESTATE == 'CLOSED') 00025 00026 def range(self): 00027 """ 00028 DAQ Query recording `first` and `last` closed entries, in OPENTIME order 00029 """ 00030 self.first = self._first() 00031 self.last = self._last() 00032 log.info("first %s " % self.first ) 00033 log.info("last %s " % self.last ) 00034 00035 def runs( self , startlocal ): 00036 """ 00037 :param startlocal: local datetime after which are interested in DAQ files 00038 00039 Return ``RUNNR`` corresponding to files closed after `startlocal` in ``OPENTIME`` order 00040 """ 00041 kstf = self.kstf 00042 maxFilesPerRun = self.cfg.get('maxFilesPerRun',1000) 00043 q = self.qdaq(kstf.RUNNR).filter(kstf.OPENTIME>startlocal).distinct() 00044 daqruns = [run for run, in q.limit(maxFilesPerRun).all()] 00045 log.info("qfter %s yields %s daqruns " % ( startlocal, len(daqruns) )) 00046 return daqruns 00047 00048 def _first(self): 00049 return self.qdaq(self.kstf).first() 00050 def _last(self): 00051 return self.qdaqr(self.kstf).first() 00052 00053 def newfiles( self , startlocal, offnames=[] ): 00054 """ 00055 :param offnames: list of LFN of already collected files 00056 """ 00057 kstf = self.kstf 00058 maxFilesPerRun = self.cfg.get('maxFilesPerRun',1000) 00059 daq = self.source() 00060 q = self.qdaq(kstf).filter(kstf.OPENTIME>startlocal) 00061 if len(offnames) > 0: 00062 daqfiles = q.filter(~kstf.LFN.in_(offnames)).limit(maxFilesPerRun).all() ## "NOT IN_" 00063 else: 00064 daqfiles = q.limit(maxFilesPerRun).all() 00065 return daqfiles 00066 00067 00068 00069 class OffFiles(dict): 00070 def __init__(self, target , cfg ): 00071 self.target = target 00072 self.cfg = cfg 00073 self.kdrdfi = self.target.dbikls_("DaqRawDataFileInfo") 00074 00075 def kdrdfi__repr__(self): 00076 return "%s %s %s %s %s %s" % ( self.__class__.__name__, self.runNo, self.fileName, self.TIMESTART, self.TIMEEND, self.SEQNO ) 00077 self.kdrdfi.__repr__ = kdrdfi__repr__ 00078 self.kdrdfi.localstart = property(lambda self:self.TIMESTART - timedelta(seconds=time.timezone)) ## time.timezone is -28800 00079 00080 def last(self): 00081 """ 00082 Accesses highest SEQNO entry from offline DBI pair `DaqRawDataFileInfo` and updates self with values 00083 00084 # self.update(runNo=last.runNo,fileNo=last.fileNo,fileName=last.fileName,timeStart=last.TIMESTART) 00085 """ 00086 off = self.target() 00087 kdrdfi = self.kdrdfi 00088 00089 last = off.query(kdrdfi).order_by(kdrdfi.SEQNO.desc()).first() 00090 self.last = last 00091 return last 00092 00093 def offnames(self, runs=[]): 00094 """ 00095 :param runs: list of runs 00096 00097 offline filenames corresponding to those runs 00098 """ 00099 off = self.target() 00100 kdrdfi = self.kdrdfi 00101 if len(runs) == 0: 00102 offnames = [] 00103 else: 00104 offnames = [fileName for fileName, in off.query(kdrdfi.fileName).filter(kdrdfi.runNo.in_(runs)).all()] 00105 return offnames 00106 00107 00108 00109 class FileInfo(BaseInfo): 00110 def __init__(self, source="tmp_daqdb", target="tmp_offline_db" , cfg={} ): 00111 BaseInfo.__init__(self, source, target, cfg) 00112 self.daqf = DaqFiles( self.source, cfg ) 00113 self.offf = OffFiles( self.target, cfg ) 00114 00115 def __repr__(self): 00116 return BaseInfo.__repr__(self) 00117 00118 def last(self): 00119 return self.offf.last() 00120 00121 def after(self, startlocal , offset_seconds=None ): 00122 """ 00123 Queries daq table SFO_TZ_FILE for closed files, that were opened after cutoff time `startlocal` 00124 and that are not already in offline table DaqRawDataFileInfo 00125 00126 Former ``N+1`` nested approach ``get_newFileList``: 00127 00128 * iterated over DAQ file entries, doing offline queries at each iteration to check for preexisting file names 00129 00130 New flat ``3`` query approach: 00131 00132 #. runs corresponding to new DAQ files 00133 #. offline files names corresponding to those runs 00134 #. DAQ files with names not already in offline 00135 00136 Could be done flat ``2``, but that would entail IN querying with all filenames, which is not sustainable. 00137 00138 """ 00139 if not offset_seconds: 00140 offset_seconds = self.cfg.get('file_seconds_offset', -60 ) 00141 00142 startlocal = startlocal + timedelta(seconds=offset_seconds) 00143 00144 daqruns = self.daqf.runs( startlocal ) 00145 log.info("qfter %s yields %s daqruns " % ( startlocal, len(daqruns) )) 00146 00147 daqfiles = [] 00148 if len(daqruns) == 0: 00149 log.info("no new closed runs after %s " % startlocal ) 00150 else: 00151 offnames = self.offf.offnames(daqruns) 00152 daqfiles = self.daqf.newfiles( startlocal, offnames ) 00153 return daqfiles 00154 00155 00156 00157 00158 00159 if __name__ == '__main__': 00160 00161 logging.basicConfig(level=logging.INFO) 00162 00163 from NonDbi import MetaDB 00164 daqm = MetaDB("tmp_daqdb") 00165 offm = MetaDB("tmp_offline_db") 00166 00167 daqf = DaqFiles( daqm ) 00168 daqf.range() 00169 00170 finfo = FileInfo(source=daqm,target=offm, cfg={}) 00171 print finfo 00172 00173 last = finfo.last() 00174 print "last_collected %r " % last 00175 print "localstart %r " % last.localstart 00176 00177 parse = lambda _:datetime.strptime(_,"%Y-%m-%d %H:%M:%S") 00178 00179 #cut = last.localstart 00180 #cut = daqf.first.OPENTIME #+ timedelta(minutes=600) 00181 #cut = parse("2011-10-20 16:35:24") 00182 cut = parse("2011-10-18 10:03:47") 00183 00184 log.info("cut %s " % cut ) 00185 fis = finfo.after( cut , offset_seconds=-60 ) 00186 00187 for sli in slice(0,10), slice(-10,None): 00188 print sli 00189 for fi in fis[sli]: 00190 print fi 00191 00192 00193