/search.css" rel="stylesheet" type="text/css"/> /search.js">
00001 #!/usr/bin/env python 00002 import os, logging 00003 from datetime import datetime, timedelta 00004 log = logging.getLogger(__name__) 00005 dt_ = lambda _:datetime.strptime(_,"%Y-%m-%d %H:%M:%S") 00006 00007 class Target(dict): 00008 """ 00009 Encapsulate DybDbi dealings here to avoid cluttering ``Scraper`` 00010 00011 Relevant config parameters 00012 00013 :param timefloor: None or a datetime or string such as '2010-09-18 22:57:32' used to limit the expense of validity query 00014 00015 """ 00016 def __init__(self, *args, **kwa ): 00017 dict.__init__(self, *args, **kwa ) 00018 from DybPython import DBConf 00019 self.dbconf = DBConf( os.environ['DBCONF'] ) 00020 self.kls = self._kls() 00021 00022 database = property(lambda self:self.dbconf.get('database')) 00023 00024 # sensible defaults for easy test/mock usage 00025 dbi_loglevel = property(lambda self:self.get('dbi_loglevel',"INFO")) 00026 seed_target_tables = property(lambda self:self.get('seed_target_tables',False)) 00027 DROP_TARGET_TABLES = property(lambda self:self.get('DROP_TARGET_TABLES',False)) 00028 ALLOW_DROP_CREATE_TABLE = property(lambda self:self.get('ALLOW_DROP_CREATE_TABLE',False)) 00029 00030 def __repr__(self): 00031 return "Target %s:%s:%s " % ( self['target'], self.database, self['kls'] ) 00032 00033 def _kls(self): 00034 """ 00035 :rtype DybDbi class: returns DybDbi/genDbi class specified by ``kls`` key within this dict 00036 00037 Also does DybDbi housekeeping: 00038 00039 #. sets loglevel according to ``dbi_loglevel`` key 00040 #. creates target tables if both ``seed_target_tables`` key is True and ``ALLOW_DROP_CREATE_TABLE`` is True 00041 #. sets convenience kls attribute `kls.attnames` providing the list of attribute names : for dict consistency checking 00042 00043 .. warning:: As this is called by ctor, must set options impacting it on dict instanciation 00044 00045 """ 00046 from DybDbi import gDbi, Level 00047 import DybDbi 00048 00049 kls = getattr( DybDbi, self['kls'] ) 00050 assert len(gDbi.cascader) == 1 00051 dbname = gDbi.cascader[0].dbname 00052 kls.attnames = kls.SpecKeys().aslist() 00053 00054 gDbi.outputlevel = Level.From( self.dbi_loglevel ) 00055 00056 allow_dropcreate = self.ALLOW_DROP_CREATE_TABLE 00057 stt = self.seed_target_tables 00058 dtt = self.DROP_TARGET_TABLES 00059 sve = 'SUPERVISOR_ENABLED' in os.environ 00060 log.warn("target._kls stt %s dtt %s sve %s " % (stt,dtt,sve) ) 00061 00062 if sve and (stt or dtt): 00063 msg = "seeding/dropping target tables is not appropriate for longterm usage under supervisord : do manually with scr.py " 00064 log.fatal(msg) 00065 raise Exception(msg) 00066 00067 if stt: 00068 log.warn("seed_target_tables proceeding against : %r " % dict( self.dbconf, password="***") ) 00069 elif dtt and allow_dropcreate: 00070 if dbname == 'offline_db' or self.database == 'offline_db': 00071 log.warn("seed_target_tables proceeding against offline_db for target %r " % dict( self.dbconf, password="***") ) 00072 kls().CreateDatabaseTables( 0, kls.__name__[1:] ) 00073 else: 00074 log.warn("seed_target_tables/DROP_TARGET_TABLES is not allowed with target %r without ALLOW_DROP_CREATE_TABLE option " % dict( self.dbconf, password="***") ) 00075 return kls 00076 00077 00078 def instance( self, **kwa ): 00079 """ 00080 Might fail with TypeError if kwa cannot be coerced, eg from aggregate queries 00081 returning None when zero samples 00082 00083 If the attribute names are not expected for the target kls they are skipped. 00084 This will be the case for the system attributes `_date_time_min` `_date_time_max` 00085 00086 """ 00087 d = dict((k,v) for k,v in kwa.items() if k in self.kls.attnames) 00088 if len(kwa) != len(d): 00089 log.debug("target.instance skipped some attributes %r in instance preparation " % list(set(kwa).difference(set(d))) ) 00090 return self.kls.Create(**d) 00091 00092 def writer(self, sv , localstart=None, localend=None ): 00093 """ 00094 Prepare DybDbi writer for target class, with contextrange/subsite appropriate for the source instance 00095 00096 Use of non-default localstart and localend type is typically only used for 00097 aggregate_group_by quering where the instance datetimes such as `sv[0].date_time` 00098 do not correspond to the contextrange of the aggregate dict. 00099 00100 :param sv: source vector instance that contains instances of an SA mapped class 00101 :param localstart: default of `None` corresponds to `sv[0].date_time` 00102 :param localend: default of `None` corresponds to `sv[-1].date_time` 00103 """ 00104 00105 start = localstart if localstart else sv[0].date_time 00106 end = localend if localend else sv[-1].date_time 00107 return Target._wrt( sv[0].__class__ , self.kls , start, end ) 00108 00109 def _wrt( cls , skls, tkls , localstart, localend ): 00110 """ 00111 Classmethod to prepare DybDbi writer for the target class with 00112 with contextrange/subsite appropriate to the source class 00113 00114 :param skls: SQLAlchemy source class which determines the appropriate contextrange to use for target class writer 00115 :param tkls: DybDbi target class for which the writer is to be prepared 00116 :param localstart: local datetime which is converted to UTC TimeStamp for timestart of CR 00117 :param localend: local datetime which is converted to UTC TimeStamp for timeend of CR 00118 00119 Assumptions made: 00120 00121 #. source tablename encodes site, subsite (IF NOT THEN MUST GET FROM INSTANCE ?) 00122 #. source date_time are naive localtimes 00123 00124 """ 00125 wrt = tkls.Wrt() 00126 xtn = skls.xtn 00127 00128 from DybDbi import ContextRange, TimeStamp, Site, SimFlag, DetectorId 00129 log.debug( "_wrt localstart %s localend %s " % ( localstart.ctime(), localend.ctime() )) 00130 timestart = TimeStamp.UTCfromLocalDatetime( localstart ) 00131 timeend = TimeStamp.UTCfromLocalDatetime( localend ) 00132 cr = ContextRange( xtn.sitemask, SimFlag.kData , timestart, timeend ) 00133 wrt.ctx( contextrange=cr, dbno=0, versiondate=TimeStamp(0,0), subsite=xtn.subsite, task=0 ) 00134 return wrt 00135 _wrt = classmethod( _wrt ) 00136 00137 00138 def require_manual(self, msg ): 00139 """ 00140 Require manual operation (ie running scr.py from commandline) 00141 preventing usage of rare operations/options under supervisor control 00142 """ 00143 sve = 'SUPERVISOR_ENABLED' in os.environ 00144 if sve: 00145 log.fatal(msg) 00146 raise Exception(msg) 00147 else: 00148 log.info("manual running detected") 00149 00150 00151 def seed(self, srcs, scraper , dummy=False ): 00152 """ 00153 This is invoked at scraper instanciation when the conditions are met: 00154 00155 #. ``seed_target_tables`` is configured True 00156 00157 Seed entries are written to the target table. The seed validity range is configured with 00158 the options: `seed_timestart` `seed_timeend` and formerly the payload entry 00159 was specified by the `def seed()` method implemented in the scraper class. 00160 00161 Attempts to perform seeding under supervisor raises an exception, 00162 to enforce this restriction. 00163 00164 When testing seeding start from scratch with eg:: 00165 00166 mysql> drop table DcsAdTemp, DcsAdTempVld ; 00167 mysql> update LOCALSEQNO set LASTUSEDSEQNO=0 where TABLENAME='DcsAdTemp' ; 00168 00169 Changes from Oct 2012, 00170 00171 #. allow use against an existing table 00172 #. remove table creation functionality is removed 00173 #. move to payloadless seeds (removing need for dummy payload instances) 00174 00175 Motivated by the need to add new sources that contribute 00176 to an existing target which has already collected data 00177 eg for adding ADs to the DcsAdWpHv scraper. 00178 00179 """ 00180 self.require_manual("seed is not appropriate for longterm usage under supervisord : do seeding manually ") 00181 if not self['seed_target_tables']: 00182 log.warn("improper call to seed " ) 00183 return 00184 00185 localstart, localend = self['seed_timestart'], self['seed_timeend'] 00186 kls = self.kls 00187 for src in srcs: 00188 tlv = self.lastvld(src.xtn) 00189 if tlv: 00190 log.info("tlv exists already : src %r tlv %r " % ( src,tlv.seqno)) 00191 else: 00192 log.warn("writing seed for src : %r " % (src) ) 00193 wrt = self._wrt( src, kls , localstart, localend ) 00194 if dummy: 00195 inst = kls.Create( **scraper.seed(src) ) 00196 wrt.Write( inst ) 00197 seqno = wrt.Close() 00198 assert seqno, "failed to seed_update for source %r " % src 00199 log.warn("wrote seed for src : %r with seqno %s " % (src,seqno) ) 00200 00201 00202 def lastvld( self, source ): 00203 """ 00204 Last validity record in target database for context corresponding to `source` class. 00205 Query expense is restricted by the `timefloor`. 00206 If `timefloor` is None a sequence of progressively 00207 more expensive queries are performed to get the target last validty. 00208 00209 :param source: source context instance either an **xtn** of MockSource instance with subsite and sitemask attributes 00210 :param timefloor: time after which to look for validity entries in target database or None 00211 00212 Note this is called only at scraper initialization, in order for the scraper to 00213 find its time cursor. 00214 00215 """ 00216 timefloor = self.get('timefloor', None) 00217 if timefloor: 00218 log.debug("using timefloor %r " % timefloor ) 00219 return self._lastvld( source, timefloor ) 00220 else: 00221 log.warn("no timefloor ... inventing ") 00222 now = datetime.now() 00223 for days in (14,100,): 00224 ts = now + timedelta(days=-days) 00225 tlv = self._lastvld( source , ts ) 00226 if tlv: 00227 return tlv 00228 return self._lastvld(source, None ) 00229 00230 def _lastvld(self, source, timefloor ): 00231 """ 00232 Obtain last target validity record for context corresponding to source class 00233 00234 :param source: SA mapped class 00235 00236 """ 00237 from DybDbi import SimFlag 00238 subsite = source.subsite 00239 task = self.get('task', 0 ) 00240 sqlcontext = "SiteMask & %s and SimMask & %s " % ( source.sitemask , SimFlag.kData ) 00241 if timefloor: 00242 sqlcontext += " and TIMESTART > '%s' " % timefloor 00243 lastvld = self.kls.GetTableProxy().LastValidityRec( sqlcontext, subsite, task ) 00244 return lastvld 00245 00246 00247 00248 00249 if __name__ == '__main__': 00250 logging.basicConfig(level=logging.INFO) 00251 00252 target = Target( kls='GDcsAdWpHv', target="tmp_offline_db", timefloor = dt_("2010-01-01 00:00:00"), seed_target_tables=True ) 00253 log.debug( "kls %r with attnames %s " % (target.kls, target.kls.attnames) ) 00254 00255 class MockSource(object): 00256 def __init__(self, sitemask_subsite): 00257 sitemask, subsite = sitemask_subsite.split() 00258 self.sitemask = int(sitemask) 00259 self.subsite = int(subsite) 00260 __repr__ = lambda self:"MockSource (%s,%s) " % (self.sitemask, self.subsite) 00261 00262 ## adhoc query to obtain all sources feeding the table already in low level way 00263 from DybPython import DB 00264 db = DB(target.dbconf.sect) 00265 sql = "select distinct(concat(sitemask,\" \",subsite)) as ss from %sVld" % target.kls.__name__[1:] 00266 sss = map(lambda _:_['ss'],db(sql)) 00267 00268 srcs = map(MockSource,sss + ["4 4","2 2"]) ## extend sources for new ADs 00269 for s in srcs: 00270 tlv = target.lastvld( s ) 00271 timeend = tlv.contextrange.timeend if tlv else "-" ## UTC TimeStamp 00272 log.info( "%r timeend %s " % (s, timeend) ) 00273 00274 #limeend = timeend.UTCtoNaiveLocalDatetime 00275 #log.info( "timeend %s (localtime)" % limeend ) 00276 00277 target['seed_timestart'] = dt_("2010-01-01 00:00:00") 00278 target['seed_timeend'] = dt_("2011-01-01 00:00:00") 00279 target.seed( srcs, None , dummy=False ) 00280 00281