/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

DybTrimIO.cc
Go to the documentation of this file.
00001 //
00002 #include "DybTrimIO.h"
00003 //
00004 // now time
00005 #include "Event/RegistrationSequence.h"
00006 #include "Event/TemporalDataObject.h"
00007 //
00008 // IO
00009 #include "DybKernel/ISaveB4TrimAesSvc.h"
00010 #include "DybKernel/IDybStorageSvc.h"
00011 //
00012 // trimming
00013 #include "GaudiKernel/IDataManagerSvc.h"
00014 #include "DybKernel/IArchiveTrimSvc.h"
00015 //
00016 #include "GaudiKernel/SmartDataPtr.h"
00017 
00018 #include "DataUtilities/DybArchiveList.h"
00019 #include "GaudiKernel/DataSvc.h"
00020 #include "GaudiKernel/IIncidentSvc.h"
00021 #include "DybKernel/EventStoreIncident.h"
00022 
00023 #include <map>
00024 
00026 
00027 DybTrimIO::DybTrimIO(const std::string& name, ISvcLocator* pSvcLocator):
00028   GaudiAlgorithm(name, pSvcLocator),
00029   m_pSaveB4TrimAesSvc(0),
00030   m_pDybStorageSvc(0),
00031   m_pArchiveSvc(0),
00032   m_pTrimSvc(0)
00033 {
00034   declareProperty("SaveFlag",m_saveFlag=false,"Save or not");
00035 
00036   declareProperty("NowLocation",
00037                   m_nowLocation =  DayaBay::RegistrationSequenceLocation::Default, 
00038                   "Location to determine current time, default: RegistrationSequence");
00039 
00040   declareProperty("RegSeqLocation",
00041                   m_regSeqLocation=DayaBay::RegistrationSequenceLocation::Default,
00042                   "Location to RegistrationSequence");
00043 
00044   declareProperty("SaveB4TrimAesSvc",
00045                   m_pSaveB4TrimAesSvcName="SaveB4TrimAesSvc",
00046                   "Name of SaveB4TrimAesSvc");
00047 
00048   declareProperty("DybStorageSvc",
00049                   m_pDybStorageSvcName="DybStorageSvc",
00050                   "Name of an IDybStorageSvc");
00051 
00052   declareProperty("ArchiveSvc",
00053                   m_pArchiveSvcName="EventDataArchiveSvc",
00054                   "Name of AES");
00055 
00056   declareProperty("TrimSvc",
00057                   m_pTrimSvcName="ArchiveTrimSvc",
00058                   "Name of trimming service");
00059 
00060 }
00061 
00062 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 
00063 
00064 StatusCode DybTrimIO::initialize()
00065 {
00066   //
00067   StatusCode status;
00068   debug() << " DybTrimIO initialize()" << endreq;
00069 
00070   if ( m_saveFlag ) {
00071     // SaveB4TrimAesSvc
00072     status = service(m_pSaveB4TrimAesSvcName,
00073                      m_pSaveB4TrimAesSvc);
00074 
00075     debug()<<"m_pSaveB4TrimAesSvc "<<m_pSaveB4TrimAesSvc<<endreq;
00076     if (status.isFailure()) {
00077       error()<<"Service [SaveB4TrimAesSvc] not found"<<endreq;
00078       return StatusCode::FAILURE;
00079     }
00080 
00081     // DybStorageSvc 
00082     status = service(m_pDybStorageSvcName,
00083                      m_pDybStorageSvc);
00084     
00085     debug()<<"m_pDybStorageSvc "<<m_pDybStorageSvc<<endreq;
00086     if (status.isFailure()) {
00087       error()<<"Service [DybStorageSvc] not found"<<endreq;
00088       return StatusCode::FAILURE;
00089     }
00090   }
00091 
00092   // Get archive store
00093   status = service(m_pArchiveSvcName,
00094                    m_pArchiveSvc);
00095 
00096   debug()<<"m_pArchiveSvc "<<m_pArchiveSvc<<endreq;
00097   if (status.isFailure()) {
00098     error()<<"Service [EventDataArchiveSvc] not found"<<endreq;
00099     return StatusCode::FAILURE;
00100   }
00101 
00102   // Get trimming Svc
00103   status = service(m_pTrimSvcName,
00104                    m_pTrimSvc);
00105 
00106   debug()<<"m_pTrimSvc "<<m_pTrimSvc<<endreq;
00107   if (status.isFailure()) {
00108     error()<<"Service [ArchiveTrimSvc] not found"<<endreq;
00109     return StatusCode::FAILURE;
00110   }
00112 
00113   return StatusCode::SUCCESS;
00114 }
00115 
00116 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 
00117 
00118 /*FIXME: The approach of completely walking the data store for
00119   trimming every object is too slow.
00120 
00121 StatusCode DybTrimIO::execute() 
00122 {
00123   StatusCode sc;
00124   
00125   // get the data object at the "now" location
00126   SmartDataPtr<const DayaBay::TemporalDataObject> 
00127               pTemoral(eventSvc(),m_nowLocation);
00128   // check the pointer
00129   if (0 == pTemoral) {
00130     error()<<"Failed to retrieve TemporalDataObject from Event Store from "
00131            << m_nowLocation
00132            <<endreq;
00133     return StatusCode::FAILURE;
00134   }  
00135 
00136   // save before trim
00137   if(m_saveFlag) {
00138     sc = m_pSaveB4TrimAesSvc->store(m_regSeqLocation,
00139                                     m_pDybStorageSvc,
00140                                     m_pArchiveSvc,
00141                                     m_pTrimSvc,
00142                                     pTemoral->earliest());
00143     if(sc.isFailure()) {
00144       debug()<<"Failed to write"<<endreq;
00145       return StatusCode::FAILURE;
00146     }
00147   }
00148 
00149   // trim base on it
00150   sc = m_pTrimSvc->trim(m_pArchiveSvc,pTemoral->earliest());
00151   if(sc.isFailure()) {
00152     debug()<<"Failed to trim"<<endreq;
00153     return StatusCode::FAILURE;
00154   }
00155 
00156   return StatusCode::SUCCESS;
00157 }
00158 */
00159 
00160 // Alternative time-optimized implementation
00161 StatusCode DybTrimIO::execute() 
00162 {
00163   StatusCode sc;
00164   
00165   // get the data object at the "now" location
00166   SmartDataPtr<const DayaBay::RegistrationSequence> 
00167               pTemporal(eventSvc(),m_regSeqLocation);
00168   // check the pointer
00169   if (0 == pTemporal) {
00170     error()<<"Failed to retrieve RegistrationSequence from Event Store from "
00171            << m_regSeqLocation
00172            <<endreq;
00173     return StatusCode::FAILURE;
00174   }
00175   
00176   // save before trim
00177   if(m_saveFlag) {
00178     sc = m_pSaveB4TrimAesSvc->store(m_regSeqLocation,
00179                                     m_pDybStorageSvc,
00180                                     m_pArchiveSvc,
00181                                     m_pTrimSvc,
00182                                     pTemporal->earliest());
00183     if(sc.isFailure()) {
00184       debug()<<"Failed to write"<<endreq;
00185       return StatusCode::FAILURE;
00186     }
00187   }
00188 
00189   // Create handle to AES
00190   static DataSvc* pAes=0;
00191   if(!pAes){
00192     try{
00193       pAes=dynamic_cast<DataSvc*>(m_pArchiveSvc);
00194     }
00195     catch(...) {
00196       error() <<"Failed to cast AES" << endreq;
00197       return StatusCode::FAILURE;
00198     }
00199   }
00200 
00201   // Catch new data paths, index
00202   static std::map<std::string,DybArchiveList*> archiveMap;
00203   // First, catch the registration sequence archive
00204   if(archiveMap.empty()){
00205     DataObject* pObj=0;      
00206     StatusCode sc = pAes->retrieveObject(m_regSeqLocation,pObj);
00207     if(sc.isFailure()) {
00208       error() << "Failed to get archive for " << m_regSeqLocation << endreq;
00209       return StatusCode::FAILURE;
00210     }
00211     // Get Archive List
00212     DybArchiveList* pArchiveList=0;
00213     try{
00214       pArchiveList=dynamic_cast<DybArchiveList*>(pObj);
00215     }
00216     catch (...) {
00217       error() << "Failed to get DybArchiveList " << m_regSeqLocation << endreq;
00218       return StatusCode::FAILURE;
00219     }
00220     archiveMap[m_regSeqLocation] = pArchiveList;
00221     //info() << "Indexing archive list at " << m_regSeqLocation << endreq;
00222   }
00223   // Next, loop over paths in reg seq, and add to index if they are new
00224   IRegistrationSequence::Registrations::const_iterator regIter, 
00225     regEnd=pTemporal->registrations().end();
00226   for(regIter=pTemporal->registrations().begin(); 
00227       regIter != regEnd; 
00228       regIter++){
00229     const ObjectReg& objReg = (*regIter);
00230     if( archiveMap.find(objReg.path()) == archiveMap.end() ){
00231       // Found a new data path, add archive list to index
00232       DataObject* pObj=0;      
00233       StatusCode sc = pAes->retrieveObject(objReg.path(),pObj);
00234       if(sc.isFailure()) {
00235         error() << "Failed to get archive for " << objReg.path() << endreq;
00236         return StatusCode::FAILURE;
00237       }
00238       // Get Archive List
00239       DybArchiveList* pArchiveList=0;
00240       try{
00241         pArchiveList=dynamic_cast<DybArchiveList*>(pObj);
00242       }
00243       catch (...) {
00244         error() << "Failed to get DybArchiveList " << objReg.path() << endreq;
00245         return StatusCode::FAILURE;
00246       }
00247       archiveMap[objReg.path()] = pArchiveList;
00248       //info() << "Indexing archive list at " << objReg.path() << endreq;
00249     }
00250   }
00251 
00252   // Define trimming time window
00253   double window = m_pTrimSvc->window(m_regSeqLocation);
00254   TimeStamp threshold = pTemporal->earliest();
00255   threshold.Add(-1.0 * window);
00256   // Iterate on known paths, and trim  
00257   std::map<std::string,DybArchiveList*>::iterator archiveMapIter,
00258     archiveMapEnd=archiveMap.end();
00259   for(archiveMapIter=archiveMap.begin();
00260       archiveMapIter!=archiveMapEnd;
00261       archiveMapIter++){
00262     //info() << "trimming " << archiveMapIter->first << endreq;
00263     DybArchiveList* archiveList = archiveMapIter->second;
00264     if(archiveList->empty()) continue;
00265     // Stupid hoops I have to jump through to properly reverse iterate
00266     // and erase on DybArchiveList
00267     DybArchiveList::iterator archiveIter = archiveList->end();
00268     DybArchiveList::iterator archiveBegin = archiveList->begin();
00269     archiveIter--;
00270     //int count=0;
00271     while(true){
00272       DataObject* dataObject = (*archiveIter);
00273       // Protect against zero object.
00274       if ( 0 == dataObject) {
00275         error() << "Broken object in archive! " << archiveMapIter->first 
00276                 << endreq;
00277         return StatusCode::FAILURE;
00278       }
00279       // Cast to Temporal object
00280       TemporalDataObject* temporalObject = 0;
00281       try{
00282         temporalObject = dynamic_cast<TemporalDataObject*>(dataObject);
00283       }catch(...){
00284         error() << "Broken temporal object in archive! " 
00285                 << archiveMapIter->first << endreq;
00286         return StatusCode::FAILURE;
00287       }
00288       //info() << "  Checking object: " << archiveMapIter->first 
00289       //     << " : " << count << endreq;
00290       if (temporalObject->earliest() <= threshold) {
00291         //info() << "    Trimming object: " << archiveMapIter->first << endreq;
00292         static IIncidentSvc* incSvc=0;
00293         if(!incSvc){
00294           StatusCode status = service("IncidentSvc",incSvc,true);
00295           if (status.isFailure()) {
00296             return status;
00297           }
00298         }
00299         incSvc->fireIncident(EventStoreIncident(temporalObject,false,
00300                                                 "TrimmingAgent"));
00301         DybArchiveList::iterator tmpIter(archiveIter);
00302         // Be sure to iterate before deleting target
00303         bool lastEntry=false;
00304         if(archiveIter==archiveBegin){
00305           lastEntry=true;
00306         }else{
00307           archiveIter--;
00308         }
00309         archiveList->erase(tmpIter);
00310         if(lastEntry) break;
00311         //count++;
00312       }else{
00313         // This archive is trimmed enough for now
00314         break;
00315       }
00316     }
00317   }
00318 
00319   return StatusCode::SUCCESS;
00320 }
00321 
00322 
00323 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 
00324 
00325 StatusCode DybTrimIO::finalize() 
00326 {
00327   StatusCode sc;
00328   debug() << "DybTrimIO finalize()" << endreq;
00329 
00330   // Trim the rest of elements off
00331   // Some data might still in AES and hasn't been written out.
00332   // Use the limit of TimeStamp to trigger the saving and trimming
00333   
00334   // save before trim
00335   if(m_saveFlag) {
00336     sc = m_pSaveB4TrimAesSvc->store(m_regSeqLocation,
00337                                     m_pDybStorageSvc,
00338                                     m_pArchiveSvc,
00339                                     m_pTrimSvc,
00340                                     TimeStamp::GetEOT());
00341     if(sc.isFailure()) {
00342       debug()<<"Failed to write"<<endreq;
00343       return StatusCode::FAILURE;
00344     }
00345   }
00346 
00347   // trim base on it
00348   sc = m_pTrimSvc->trim(m_pArchiveSvc,TimeStamp::GetEOT());
00349   if(sc.isFailure()) {
00350     debug()<<"Failed to trim"<<endreq;
00351     return StatusCode::FAILURE;
00352   }
00353 
00354   // Now release these service
00355   if (0 != m_pTrimSvc) {
00356     m_pTrimSvc->release();
00357   }
00358 
00359   if (0 != m_pArchiveSvc) {
00360     m_pArchiveSvc->release();
00361   }    
00362 
00363   if(m_saveFlag) {
00364     if (0 != m_pSaveB4TrimAesSvc) {
00365       m_pSaveB4TrimAesSvc->release();
00366     }
00367 
00368     if (0 != m_pDybStorageSvc) {
00369       m_pDybStorageSvc->release();
00370     }
00371   }
00372 
00373   return StatusCode::SUCCESS;
00374 }
00375 
00376 
00377 // Local Variables: **
00378 // c-basic-offset:2 **
00379 // End: **
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 10:09:44 for DybAlg by doxygen 1.7.4