/search.css" rel="stylesheet" type="text/css"/> /search.js">
00001 #include "DybArchiveOutput.h" 00002 00003 #include "DybKernel/EventStoreIncident.h" 00004 #include "Event/HeaderObject.h" 00005 00006 #include "GaudiKernel/DataSvc.h" 00007 #include "GaudiKernel/IIncidentSvc.h" 00008 00009 #include <sstream> 00010 00011 using namespace DayaBay; 00012 00013 DybArchiveOutput::DybArchiveOutput (const std::string& name, ISvcLocator* pSvcLocator) 00014 : GaudiAlgorithm(name, pSvcLocator) 00015 , m_aes(0) 00016 , m_store(0) 00017 , m_incsvc(0) 00018 { 00019 declareProperty("WindowSeconds", m_window = 0.0, "Time window to apply to the AES"); 00020 declareProperty("SaveOutput", m_saveFlag = false, "Set to true to save ouput, default is false"); 00021 declareProperty("RegSeqLocation", 00022 m_regSeqLocation = DayaBay::RegistrationSequenceLocation::Default, 00023 "Location to RegistrationSequence"); 00024 declareProperty("ArchiveSvc", 00025 m_aesName = "EventDataArchiveSvc", 00026 "Name of Archive Event Store"); 00027 declareProperty("DybStorageSvc", 00028 m_dybStorageSvcName = "DybStorageSvc", 00029 "Name of an IDybStorageSvc"); 00030 } 00031 00032 StatusCode DybArchiveOutput::initialize() 00033 { 00034 StatusCode sc = GaudiAlgorithm::initialize(); 00035 if (sc.isFailure()) return sc; 00036 00037 sc = service(m_aesName,m_aes); 00038 if (sc.isFailure()) return sc; 00039 00040 if (m_saveFlag) { 00041 sc = service(m_dybStorageSvcName, m_store); 00042 if (sc.isFailure()) return sc; 00043 } 00044 00045 sc = service("IncidentSvc",m_incsvc,true); 00046 if (sc.isFailure()) return sc; 00047 00048 debug() << "initialized" << endreq; 00049 00050 return StatusCode::SUCCESS; 00051 } 00052 00053 static std::string dostr(const DataObject* obj) 00054 { 00055 std::stringstream ss; 00056 ss << obj->name() << ": refCount=" << obj->refCount(); 00057 const TemporalDataObject* tdo = dynamic_cast<const TemporalDataObject*>(obj); 00058 if (!tdo) { 00059 return ss.str(); 00060 } 00061 ss << " time:" << tdo->earliest() << " --> " << tdo->latest(); 00062 const HeaderObject* ho = dynamic_cast<const HeaderObject*>(tdo); 00063 if (!ho) { 00064 return ss.str(); 00065 } 00066 ss << " execNumber=" << ho->execNumber(); 00067 return ss.str(); 00068 } 00069 00070 DybArchiveList* DybArchiveOutput::aes_list(const std::string& location) 00071 { 00072 // return the list at the given location or null if it d.n.e. use 00073 // local cache to avoid somewhat expensive calls to 00074 // aes->retrieveObject(). 00075 00076 AESListMap_t::iterator it = m_aesListMap.find(location); 00077 if (it != m_aesListMap.end()) { // cache hit 00078 return it->second; 00079 } 00080 00081 DataObject* object = 0; 00082 StatusCode sc = m_aes->retrieveObject(location,object); 00083 if (sc.isFailure()) { // not here now but maybe later. 00084 return 0; 00085 } 00086 DybArchiveList* aesList = dynamic_cast<DybArchiveList*>(object); 00087 if (!aesList) { 00088 // fixme: add warning msg 00089 return 0; 00090 } 00091 00092 m_aesListMap[location] = aesList; 00093 return aesList; 00094 } 00095 00096 StatusCode DybArchiveOutput::store(DayaBay::RegistrationSequence* rs) 00097 { 00098 if (! m_store) { // not configure to store data 00099 return StatusCode::SUCCESS; 00100 } 00101 00102 debug() << "Storing RegistrationSequence with " 00103 << rs->size() << " registrations spanning " 00104 << rs->earliest() << " --> " << rs->latest() 00105 << endreq; 00106 return m_store->store(*rs); 00107 } 00108 00109 DybArchiveList::iterator DybArchiveOutput::find_in_archive(DybArchiveList* aeslist, const DataObject* object) 00110 { 00111 DybArchiveList::iterator it, done = aeslist->end(); 00112 for (it = aeslist->begin(); it != done; ++it) { 00113 if (object == *it) { 00114 return it; 00115 } 00116 } 00117 return done; 00118 } 00119 00120 00121 StatusCode DybArchiveOutput::erase(const DataObject* object, const std::string& path) 00122 { 00123 DybArchiveList* lst = aes_list(path); 00124 if (!lst) { 00125 error() << "No AES list for " << path << endreq; 00126 return StatusCode::FAILURE; 00127 } 00128 00129 DybArchiveList::iterator todie = find_in_archive(lst,object); 00130 if (todie == lst->end()) { 00131 error() << "Failed to find object for " << path 00132 << " object:" << dostr(object) 00133 << endreq; 00134 return StatusCode::FAILURE; 00135 } 00136 00137 m_incsvc->fireIncident(EventStoreIncident(object, false, "TrimmingAgent")); 00138 00139 debug() << "Erasing object from " << path 00140 << " object:" << dostr(object) 00141 << endreq; 00142 00143 lst->erase(todie); // when Registration is dtor'ed, object.release() is called 00144 00145 return StatusCode::SUCCESS; 00146 } 00147 StatusCode DybArchiveOutput::trim(DayaBay::RegistrationSequence* rs) 00148 { 00149 const IRegistrationSequence::Registrations& regs = rs->registrations(); 00150 IRegistrationSequence::Registrations::const_iterator regit, regend = regs.end(); 00151 00152 // First remove all registrations from their AES lists 00153 for (regit = regs.begin(); regit != regend; ++regit) { 00154 const ObjectReg& reg = (*regit); 00155 StatusCode sc = erase(reg.object(),reg.path()); 00156 if (sc.isFailure()) { 00157 return StatusCode::FAILURE; 00158 } 00159 00160 } 00161 00162 // Finally, trim the RegistrationSequence itself. 00163 StatusCode sc = erase(rs,m_regSeqLocation); 00164 if (sc.isFailure()) { 00165 return StatusCode::FAILURE; 00166 } 00167 00168 return StatusCode::SUCCESS; 00169 } 00170 00171 00172 RegistrationSequence* DybArchiveOutput::condemned() 00173 { 00174 // Try to return the next RS if it is out of the window 00175 00176 DybArchiveList* rslist = aes_list(m_regSeqLocation); 00177 if (! rslist) return 0; 00178 if (rslist->size() <= 1) return 0; 00179 00180 // newest objects are push_front'ed, so we must check the back for oldest. 00181 DybArchiveList::reverse_iterator rit_old = rslist->rbegin(); 00182 DybArchiveList::iterator rit_new = rslist->begin(); 00183 RegistrationSequence* rs_old = dynamic_cast<RegistrationSequence*>(*rit_old); 00184 RegistrationSequence* rs_new = dynamic_cast<RegistrationSequence*>(*rit_new); 00185 00186 if (!rs_old || !rs_new) { 00187 error() << "Not a RegistrationSequence at " << m_regSeqLocation << endreq; 00188 return 0; 00189 } 00190 00191 TimeStamp threshold = rs_new->earliest(); 00192 threshold.Add(-1.0 * m_window); 00193 00194 if (rs_old->earliest() > threshold) { 00195 return 0; 00196 } 00197 00198 return rs_old; // I salute you. 00199 } 00200 00201 StatusCode DybArchiveOutput::process(RegistrationSequence* rs) 00202 { 00203 // Process one RS, maybe store and then trim 00204 00205 StatusCode sc = store(rs); 00206 if (sc.isFailure()) { 00207 error() << "Failed to store RegistrationSequence: " << dostr(rs) << endreq; 00208 return sc; 00209 } 00210 00211 sc = trim(rs); 00212 if (sc.isFailure()) { 00213 error() << "Failed to trim RegistrationSequence: " << dostr(rs) << endreq; 00214 return sc; 00215 } 00216 00217 return StatusCode::SUCCESS; 00218 } 00219 00220 StatusCode DybArchiveOutput::execute() 00221 { 00222 // save and trim until no more 00223 while (true) { 00224 RegistrationSequence* rs = condemned(); 00225 if (!rs) { 00226 return StatusCode::SUCCESS; 00227 } 00228 00229 StatusCode sc = process(rs); 00230 if (sc.isFailure()) { 00231 // fixme: error msg 00232 return sc; 00233 } 00234 } 00235 00236 // actually, should never reach here. 00237 return StatusCode::SUCCESS; 00238 } 00239 00240 StatusCode DybArchiveOutput::sweep_up() 00241 { 00242 // process any remaining data left in the AES. 00243 00244 DybArchiveList* rslist = aes_list(m_regSeqLocation); 00245 00246 debug() << "sweeping up with " << rslist->size() << " RegistrationSequences to process" << endreq; 00247 00248 // be careful iterating over this list because it is being erased as we go 00249 while (true) { 00250 if (!rslist->size()) { 00251 break; 00252 } 00253 00254 DybArchiveList::reverse_iterator rit = rslist->rbegin(); 00255 RegistrationSequence* rs = dynamic_cast<RegistrationSequence*>(*rit); 00256 StatusCode sc = process(rs); 00257 if (sc.isFailure()) { 00258 return sc; 00259 } 00260 } 00261 return StatusCode::SUCCESS; 00262 } 00263 StatusCode DybArchiveOutput::stop() 00264 { 00265 debug() << "stop called" << endreq; 00266 00267 return GaudiAlgorithm::stop(); 00268 } 00269 StatusCode DybArchiveOutput::finalize() 00270 { 00271 debug() << "finalize" << endreq; 00272 00273 StatusCode sc = sweep_up(); 00274 if (sc.isFailure()) { 00275 return sc; 00276 } 00277 00278 return GaudiAlgorithm::finalize(); 00279 } 00280 00281