/search.css" rel="stylesheet" type="text/css"/> /search.js">
00001 // 00002 #include "DybTrimIO.h" 00003 // 00004 // now time 00005 #include "Event/RegistrationSequence.h" 00006 #include "Event/TemporalDataObject.h" 00007 // 00008 // IO 00009 #include "DybKernel/ISaveB4TrimAesSvc.h" 00010 #include "DybKernel/IDybStorageSvc.h" 00011 // 00012 // trimming 00013 #include "GaudiKernel/IDataManagerSvc.h" 00014 #include "DybKernel/IArchiveTrimSvc.h" 00015 // 00016 #include "GaudiKernel/SmartDataPtr.h" 00017 00018 #include "DataUtilities/DybArchiveList.h" 00019 #include "GaudiKernel/DataSvc.h" 00020 #include "GaudiKernel/IIncidentSvc.h" 00021 #include "DybKernel/EventStoreIncident.h" 00022 00023 #include <map> 00024 00026 00027 DybTrimIO::DybTrimIO(const std::string& name, ISvcLocator* pSvcLocator): 00028 GaudiAlgorithm(name, pSvcLocator), 00029 m_pSaveB4TrimAesSvc(0), 00030 m_pDybStorageSvc(0), 00031 m_pArchiveSvc(0), 00032 m_pTrimSvc(0) 00033 { 00034 declareProperty("SaveFlag",m_saveFlag=false,"Save or not"); 00035 00036 declareProperty("NowLocation", 00037 m_nowLocation = DayaBay::RegistrationSequenceLocation::Default, 00038 "Location to determine current time, default: RegistrationSequence"); 00039 00040 declareProperty("RegSeqLocation", 00041 m_regSeqLocation=DayaBay::RegistrationSequenceLocation::Default, 00042 "Location to RegistrationSequence"); 00043 00044 declareProperty("SaveB4TrimAesSvc", 00045 m_pSaveB4TrimAesSvcName="SaveB4TrimAesSvc", 00046 "Name of SaveB4TrimAesSvc"); 00047 00048 declareProperty("DybStorageSvc", 00049 m_pDybStorageSvcName="DybStorageSvc", 00050 "Name of an IDybStorageSvc"); 00051 00052 declareProperty("ArchiveSvc", 00053 m_pArchiveSvcName="EventDataArchiveSvc", 00054 "Name of AES"); 00055 00056 declareProperty("TrimSvc", 00057 m_pTrimSvcName="ArchiveTrimSvc", 00058 "Name of trimming service"); 00059 00060 } 00061 00062 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 00063 00064 StatusCode DybTrimIO::initialize() 00065 { 00066 // 00067 StatusCode status; 00068 debug() << " DybTrimIO initialize()" << endreq; 00069 00070 if ( m_saveFlag ) { 00071 // SaveB4TrimAesSvc 00072 status = service(m_pSaveB4TrimAesSvcName, 00073 m_pSaveB4TrimAesSvc); 00074 00075 debug()<<"m_pSaveB4TrimAesSvc "<<m_pSaveB4TrimAesSvc<<endreq; 00076 if (status.isFailure()) { 00077 error()<<"Service [SaveB4TrimAesSvc] not found"<<endreq; 00078 return StatusCode::FAILURE; 00079 } 00080 00081 // DybStorageSvc 00082 status = service(m_pDybStorageSvcName, 00083 m_pDybStorageSvc); 00084 00085 debug()<<"m_pDybStorageSvc "<<m_pDybStorageSvc<<endreq; 00086 if (status.isFailure()) { 00087 error()<<"Service [DybStorageSvc] not found"<<endreq; 00088 return StatusCode::FAILURE; 00089 } 00090 } 00091 00092 // Get archive store 00093 status = service(m_pArchiveSvcName, 00094 m_pArchiveSvc); 00095 00096 debug()<<"m_pArchiveSvc "<<m_pArchiveSvc<<endreq; 00097 if (status.isFailure()) { 00098 error()<<"Service [EventDataArchiveSvc] not found"<<endreq; 00099 return StatusCode::FAILURE; 00100 } 00101 00102 // Get trimming Svc 00103 status = service(m_pTrimSvcName, 00104 m_pTrimSvc); 00105 00106 debug()<<"m_pTrimSvc "<<m_pTrimSvc<<endreq; 00107 if (status.isFailure()) { 00108 error()<<"Service [ArchiveTrimSvc] not found"<<endreq; 00109 return StatusCode::FAILURE; 00110 } 00112 00113 return StatusCode::SUCCESS; 00114 } 00115 00116 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 00117 00118 /*FIXME: The approach of completely walking the data store for 00119 trimming every object is too slow. 00120 00121 StatusCode DybTrimIO::execute() 00122 { 00123 StatusCode sc; 00124 00125 // get the data object at the "now" location 00126 SmartDataPtr<const DayaBay::TemporalDataObject> 00127 pTemoral(eventSvc(),m_nowLocation); 00128 // check the pointer 00129 if (0 == pTemoral) { 00130 error()<<"Failed to retrieve TemporalDataObject from Event Store from " 00131 << m_nowLocation 00132 <<endreq; 00133 return StatusCode::FAILURE; 00134 } 00135 00136 // save before trim 00137 if(m_saveFlag) { 00138 sc = m_pSaveB4TrimAesSvc->store(m_regSeqLocation, 00139 m_pDybStorageSvc, 00140 m_pArchiveSvc, 00141 m_pTrimSvc, 00142 pTemoral->earliest()); 00143 if(sc.isFailure()) { 00144 debug()<<"Failed to write"<<endreq; 00145 return StatusCode::FAILURE; 00146 } 00147 } 00148 00149 // trim base on it 00150 sc = m_pTrimSvc->trim(m_pArchiveSvc,pTemoral->earliest()); 00151 if(sc.isFailure()) { 00152 debug()<<"Failed to trim"<<endreq; 00153 return StatusCode::FAILURE; 00154 } 00155 00156 return StatusCode::SUCCESS; 00157 } 00158 */ 00159 00160 // Alternative time-optimized implementation 00161 StatusCode DybTrimIO::execute() 00162 { 00163 StatusCode sc; 00164 00165 // get the data object at the "now" location 00166 SmartDataPtr<const DayaBay::RegistrationSequence> 00167 pTemporal(eventSvc(),m_regSeqLocation); 00168 // check the pointer 00169 if (0 == pTemporal) { 00170 error()<<"Failed to retrieve RegistrationSequence from Event Store from " 00171 << m_regSeqLocation 00172 <<endreq; 00173 return StatusCode::FAILURE; 00174 } 00175 00176 // save before trim 00177 if(m_saveFlag) { 00178 sc = m_pSaveB4TrimAesSvc->store(m_regSeqLocation, 00179 m_pDybStorageSvc, 00180 m_pArchiveSvc, 00181 m_pTrimSvc, 00182 pTemporal->earliest()); 00183 if(sc.isFailure()) { 00184 debug()<<"Failed to write"<<endreq; 00185 return StatusCode::FAILURE; 00186 } 00187 } 00188 00189 // Create handle to AES 00190 static DataSvc* pAes=0; 00191 if(!pAes){ 00192 try{ 00193 pAes=dynamic_cast<DataSvc*>(m_pArchiveSvc); 00194 } 00195 catch(...) { 00196 error() <<"Failed to cast AES" << endreq; 00197 return StatusCode::FAILURE; 00198 } 00199 } 00200 00201 // Catch new data paths, index 00202 static std::map<std::string,DybArchiveList*> archiveMap; 00203 // First, catch the registration sequence archive 00204 if(archiveMap.empty()){ 00205 DataObject* pObj=0; 00206 StatusCode sc = pAes->retrieveObject(m_regSeqLocation,pObj); 00207 if(sc.isFailure()) { 00208 error() << "Failed to get archive for " << m_regSeqLocation << endreq; 00209 return StatusCode::FAILURE; 00210 } 00211 // Get Archive List 00212 DybArchiveList* pArchiveList=0; 00213 try{ 00214 pArchiveList=dynamic_cast<DybArchiveList*>(pObj); 00215 } 00216 catch (...) { 00217 error() << "Failed to get DybArchiveList " << m_regSeqLocation << endreq; 00218 return StatusCode::FAILURE; 00219 } 00220 archiveMap[m_regSeqLocation] = pArchiveList; 00221 //info() << "Indexing archive list at " << m_regSeqLocation << endreq; 00222 } 00223 // Next, loop over paths in reg seq, and add to index if they are new 00224 IRegistrationSequence::Registrations::const_iterator regIter, 00225 regEnd=pTemporal->registrations().end(); 00226 for(regIter=pTemporal->registrations().begin(); 00227 regIter != regEnd; 00228 regIter++){ 00229 const ObjectReg& objReg = (*regIter); 00230 if( archiveMap.find(objReg.path()) == archiveMap.end() ){ 00231 // Found a new data path, add archive list to index 00232 DataObject* pObj=0; 00233 StatusCode sc = pAes->retrieveObject(objReg.path(),pObj); 00234 if(sc.isFailure()) { 00235 error() << "Failed to get archive for " << objReg.path() << endreq; 00236 return StatusCode::FAILURE; 00237 } 00238 // Get Archive List 00239 DybArchiveList* pArchiveList=0; 00240 try{ 00241 pArchiveList=dynamic_cast<DybArchiveList*>(pObj); 00242 } 00243 catch (...) { 00244 error() << "Failed to get DybArchiveList " << objReg.path() << endreq; 00245 return StatusCode::FAILURE; 00246 } 00247 archiveMap[objReg.path()] = pArchiveList; 00248 //info() << "Indexing archive list at " << objReg.path() << endreq; 00249 } 00250 } 00251 00252 // Define trimming time window 00253 double window = m_pTrimSvc->window(m_regSeqLocation); 00254 TimeStamp threshold = pTemporal->earliest(); 00255 threshold.Add(-1.0 * window); 00256 // Iterate on known paths, and trim 00257 std::map<std::string,DybArchiveList*>::iterator archiveMapIter, 00258 archiveMapEnd=archiveMap.end(); 00259 for(archiveMapIter=archiveMap.begin(); 00260 archiveMapIter!=archiveMapEnd; 00261 archiveMapIter++){ 00262 //info() << "trimming " << archiveMapIter->first << endreq; 00263 DybArchiveList* archiveList = archiveMapIter->second; 00264 if(archiveList->empty()) continue; 00265 // Stupid hoops I have to jump through to properly reverse iterate 00266 // and erase on DybArchiveList 00267 DybArchiveList::iterator archiveIter = archiveList->end(); 00268 DybArchiveList::iterator archiveBegin = archiveList->begin(); 00269 archiveIter--; 00270 //int count=0; 00271 while(true){ 00272 DataObject* dataObject = (*archiveIter); 00273 // Protect against zero object. 00274 if ( 0 == dataObject) { 00275 error() << "Broken object in archive! " << archiveMapIter->first 00276 << endreq; 00277 return StatusCode::FAILURE; 00278 } 00279 // Cast to Temporal object 00280 TemporalDataObject* temporalObject = 0; 00281 try{ 00282 temporalObject = dynamic_cast<TemporalDataObject*>(dataObject); 00283 }catch(...){ 00284 error() << "Broken temporal object in archive! " 00285 << archiveMapIter->first << endreq; 00286 return StatusCode::FAILURE; 00287 } 00288 //info() << " Checking object: " << archiveMapIter->first 00289 // << " : " << count << endreq; 00290 if (temporalObject->earliest() <= threshold) { 00291 //info() << " Trimming object: " << archiveMapIter->first << endreq; 00292 static IIncidentSvc* incSvc=0; 00293 if(!incSvc){ 00294 StatusCode status = service("IncidentSvc",incSvc,true); 00295 if (status.isFailure()) { 00296 return status; 00297 } 00298 } 00299 incSvc->fireIncident(EventStoreIncident(temporalObject,false, 00300 "TrimmingAgent")); 00301 DybArchiveList::iterator tmpIter(archiveIter); 00302 // Be sure to iterate before deleting target 00303 bool lastEntry=false; 00304 if(archiveIter==archiveBegin){ 00305 lastEntry=true; 00306 }else{ 00307 archiveIter--; 00308 } 00309 archiveList->erase(tmpIter); 00310 if(lastEntry) break; 00311 //count++; 00312 }else{ 00313 // This archive is trimmed enough for now 00314 break; 00315 } 00316 } 00317 } 00318 00319 return StatusCode::SUCCESS; 00320 } 00321 00322 00323 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 00324 00325 StatusCode DybTrimIO::finalize() 00326 { 00327 StatusCode sc; 00328 debug() << "DybTrimIO finalize()" << endreq; 00329 00330 // Trim the rest of elements off 00331 // Some data might still in AES and hasn't been written out. 00332 // Use the limit of TimeStamp to trigger the saving and trimming 00333 00334 // save before trim 00335 if(m_saveFlag) { 00336 sc = m_pSaveB4TrimAesSvc->store(m_regSeqLocation, 00337 m_pDybStorageSvc, 00338 m_pArchiveSvc, 00339 m_pTrimSvc, 00340 TimeStamp::GetEOT()); 00341 if(sc.isFailure()) { 00342 debug()<<"Failed to write"<<endreq; 00343 return StatusCode::FAILURE; 00344 } 00345 } 00346 00347 // trim base on it 00348 sc = m_pTrimSvc->trim(m_pArchiveSvc,TimeStamp::GetEOT()); 00349 if(sc.isFailure()) { 00350 debug()<<"Failed to trim"<<endreq; 00351 return StatusCode::FAILURE; 00352 } 00353 00354 // Now release these service 00355 if (0 != m_pTrimSvc) { 00356 m_pTrimSvc->release(); 00357 } 00358 00359 if (0 != m_pArchiveSvc) { 00360 m_pArchiveSvc->release(); 00361 } 00362 00363 if(m_saveFlag) { 00364 if (0 != m_pSaveB4TrimAesSvc) { 00365 m_pSaveB4TrimAesSvc->release(); 00366 } 00367 00368 if (0 != m_pDybStorageSvc) { 00369 m_pDybStorageSvc->release(); 00370 } 00371 } 00372 00373 return StatusCode::SUCCESS; 00374 } 00375 00376 00377 // Local Variables: ** 00378 // c-basic-offset:2 ** 00379 // End: **