/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

DybArchiveOutput.cc
Go to the documentation of this file.
00001 #include "DybArchiveOutput.h"
00002 
00003 #include "DybKernel/EventStoreIncident.h"
00004 #include "Event/HeaderObject.h"
00005 
00006 #include "GaudiKernel/DataSvc.h"
00007 #include "GaudiKernel/IIncidentSvc.h"
00008 
00009 #include <sstream>
00010 
00011 using namespace DayaBay;
00012 
00013 DybArchiveOutput::DybArchiveOutput (const std::string& name, ISvcLocator* pSvcLocator)
00014  : GaudiAlgorithm(name, pSvcLocator)
00015  , m_aes(0)
00016  , m_store(0)
00017  , m_incsvc(0)
00018 {
00019     declareProperty("WindowSeconds", m_window = 0.0, "Time window to apply to the AES");    
00020     declareProperty("SaveOutput", m_saveFlag = false, "Set to true to save ouput, default is false");
00021     declareProperty("RegSeqLocation",
00022                     m_regSeqLocation = DayaBay::RegistrationSequenceLocation::Default,
00023                     "Location to RegistrationSequence");
00024     declareProperty("ArchiveSvc",
00025                     m_aesName = "EventDataArchiveSvc",
00026                     "Name of Archive Event Store");
00027     declareProperty("DybStorageSvc",
00028                     m_dybStorageSvcName = "DybStorageSvc",
00029                     "Name of an IDybStorageSvc");
00030 }
00031 
00032 StatusCode DybArchiveOutput::initialize()
00033 {
00034     StatusCode sc = GaudiAlgorithm::initialize();
00035     if (sc.isFailure()) return sc;
00036 
00037     sc = service(m_aesName,m_aes);
00038     if (sc.isFailure()) return sc;
00039 
00040     if (m_saveFlag) {
00041         sc = service(m_dybStorageSvcName, m_store);
00042         if (sc.isFailure()) return sc;    
00043     }
00044 
00045     sc = service("IncidentSvc",m_incsvc,true);
00046     if (sc.isFailure()) return sc;
00047 
00048     debug() << "initialized" << endreq;
00049 
00050     return StatusCode::SUCCESS;
00051 }
00052 
00053 static std::string dostr(const DataObject* obj)
00054 {
00055     std::stringstream ss;
00056     ss << obj->name() << ": refCount=" << obj->refCount();
00057     const TemporalDataObject* tdo = dynamic_cast<const TemporalDataObject*>(obj);
00058     if (!tdo) {
00059         return ss.str();
00060     }
00061     ss << " time:" << tdo->earliest() << " --> " << tdo->latest();
00062     const HeaderObject* ho = dynamic_cast<const HeaderObject*>(tdo);
00063     if (!ho) {
00064         return ss.str();
00065     }
00066     ss << " execNumber=" << ho->execNumber();
00067     return ss.str();
00068 }
00069 
00070 DybArchiveList* DybArchiveOutput::aes_list(const std::string& location)
00071 {
00072     // return the list at the given location or null if it d.n.e.  use
00073     // local cache to avoid somewhat expensive calls to
00074     // aes->retrieveObject().
00075 
00076     AESListMap_t::iterator it = m_aesListMap.find(location);
00077     if (it != m_aesListMap.end()) { // cache hit
00078         return it->second;
00079     }
00080 
00081     DataObject* object = 0;
00082     StatusCode sc = m_aes->retrieveObject(location,object);
00083     if (sc.isFailure()) {       // not here now but maybe later.
00084         return 0;
00085     }
00086     DybArchiveList* aesList = dynamic_cast<DybArchiveList*>(object);
00087     if (!aesList) {
00088         // fixme: add warning msg
00089         return 0;
00090     }
00091     
00092     m_aesListMap[location] = aesList;
00093     return aesList;
00094 }
00095 
00096 StatusCode DybArchiveOutput::store(DayaBay::RegistrationSequence* rs)
00097 {
00098     if (! m_store) {            // not configure to store data
00099         return StatusCode::SUCCESS;
00100     }
00101 
00102     debug() << "Storing RegistrationSequence with "
00103             << rs->size() << " registrations spanning " 
00104             << rs->earliest() << " --> " << rs->latest()
00105             << endreq;
00106     return m_store->store(*rs);
00107 }
00108 
00109 DybArchiveList::iterator DybArchiveOutput::find_in_archive(DybArchiveList* aeslist, const DataObject* object)
00110 {
00111     DybArchiveList::iterator it, done = aeslist->end();
00112     for (it = aeslist->begin(); it != done; ++it) {
00113         if (object == *it) {
00114             return it;
00115         }
00116     }
00117     return done;
00118 }
00119 
00120 
00121 StatusCode DybArchiveOutput::erase(const DataObject* object, const std::string& path)
00122 {
00123     DybArchiveList* lst = aes_list(path);
00124     if (!lst) {
00125         error() << "No AES list for " << path << endreq;
00126         return StatusCode::FAILURE;
00127     }
00128 
00129     DybArchiveList::iterator todie = find_in_archive(lst,object);
00130     if (todie == lst->end()) {
00131         error() << "Failed to find object for " << path
00132                 << " object:" << dostr(object)
00133                 << endreq;
00134         return StatusCode::FAILURE;
00135     }
00136 
00137     m_incsvc->fireIncident(EventStoreIncident(object, false, "TrimmingAgent"));
00138 
00139     debug() << "Erasing object from " << path
00140             << " object:" << dostr(object)
00141             << endreq;
00142 
00143     lst->erase(todie);  // when Registration is dtor'ed, object.release() is called
00144 
00145     return StatusCode::SUCCESS;
00146 }
00147 StatusCode DybArchiveOutput::trim(DayaBay::RegistrationSequence* rs)
00148 {
00149     const IRegistrationSequence::Registrations& regs = rs->registrations();
00150     IRegistrationSequence::Registrations::const_iterator regit, regend = regs.end();
00151 
00152     // First remove all registrations from their AES lists
00153     for (regit = regs.begin(); regit != regend; ++regit) {
00154         const ObjectReg& reg = (*regit);
00155         StatusCode sc = erase(reg.object(),reg.path());
00156         if (sc.isFailure()) {
00157             return StatusCode::FAILURE;
00158         }
00159 
00160     }
00161 
00162     // Finally, trim the RegistrationSequence itself.
00163     StatusCode sc = erase(rs,m_regSeqLocation);
00164     if (sc.isFailure()) {
00165         return StatusCode::FAILURE;
00166     }
00167 
00168     return StatusCode::SUCCESS;
00169 }    
00170 
00171 
00172 RegistrationSequence* DybArchiveOutput::condemned()
00173 {
00174     // Try to return the next RS if it is out of the window
00175 
00176     DybArchiveList* rslist = aes_list(m_regSeqLocation);
00177     if (! rslist) return 0;
00178     if (rslist->size() <= 1) return 0;
00179 
00180     // newest objects are push_front'ed, so we must check the back for oldest.
00181     DybArchiveList::reverse_iterator rit_old = rslist->rbegin();
00182     DybArchiveList::iterator rit_new = rslist->begin();
00183     RegistrationSequence* rs_old = dynamic_cast<RegistrationSequence*>(*rit_old);
00184     RegistrationSequence* rs_new = dynamic_cast<RegistrationSequence*>(*rit_new);
00185 
00186     if (!rs_old || !rs_new) {
00187         error() << "Not a RegistrationSequence at " << m_regSeqLocation << endreq;
00188         return 0;
00189     }
00190 
00191     TimeStamp threshold = rs_new->earliest();
00192     threshold.Add(-1.0 * m_window);
00193 
00194     if (rs_old->earliest() > threshold) {
00195         return 0;
00196     }
00197 
00198     return rs_old;              // I salute you.
00199 }
00200 
00201 StatusCode DybArchiveOutput::process(RegistrationSequence* rs)
00202 {
00203     // Process one RS, maybe store and then trim
00204 
00205     StatusCode sc = store(rs);
00206     if (sc.isFailure()) {
00207         error() << "Failed to store RegistrationSequence: " << dostr(rs) << endreq;
00208         return sc;
00209     }
00210 
00211     sc = trim(rs);
00212     if (sc.isFailure()) {
00213         error() << "Failed to trim RegistrationSequence: " << dostr(rs) << endreq;
00214         return sc;
00215     }
00216 
00217     return StatusCode::SUCCESS;
00218  }
00219 
00220 StatusCode DybArchiveOutput::execute()
00221 {
00222     // save and trim until no more 
00223     while (true) {
00224         RegistrationSequence* rs = condemned();
00225         if (!rs) { 
00226             return StatusCode::SUCCESS;
00227         }
00228 
00229         StatusCode sc = process(rs);
00230         if (sc.isFailure()) {
00231             // fixme: error msg
00232             return sc;
00233         }
00234     }
00235     
00236     // actually, should never reach here.
00237     return StatusCode::SUCCESS;
00238 }
00239 
00240 StatusCode DybArchiveOutput::sweep_up()
00241 {
00242     // process any remaining data left in the AES.
00243 
00244     DybArchiveList* rslist = aes_list(m_regSeqLocation);
00245 
00246     debug() << "sweeping up with " << rslist->size() << " RegistrationSequences to process" << endreq;
00247 
00248     // be careful iterating over this list because it is being erased as we go
00249     while (true) {              
00250         if (!rslist->size()) {
00251             break;
00252         }
00253 
00254         DybArchiveList::reverse_iterator rit = rslist->rbegin();
00255         RegistrationSequence* rs = dynamic_cast<RegistrationSequence*>(*rit);
00256         StatusCode sc = process(rs);
00257         if (sc.isFailure()) {
00258             return sc;
00259         }
00260     }
00261     return StatusCode::SUCCESS;
00262 }
00263 StatusCode DybArchiveOutput::stop()
00264 {
00265     debug() << "stop called" << endreq;
00266 
00267     return GaudiAlgorithm::stop();
00268 }
00269 StatusCode DybArchiveOutput::finalize()
00270 {
00271     debug() << "finalize" << endreq;
00272 
00273     StatusCode sc = sweep_up();
00274     if (sc.isFailure()) {
00275         return sc;
00276     }
00277 
00278     return GaudiAlgorithm::finalize();
00279 }
00280 
00281 
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 10:09:44 for DybAlg by doxygen 1.7.4