/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

target.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import os, logging
00003 from datetime import datetime, timedelta
00004 log = logging.getLogger(__name__)
00005 dt_ = lambda _:datetime.strptime(_,"%Y-%m-%d %H:%M:%S")
00006 
00007 class Target(dict):
00008     """
00009     Encapsulate DybDbi dealings here to avoid cluttering ``Scraper``
00010 
00011     Relevant config parameters 
00012 
00013     :param timefloor: None or a datetime or string such as '2010-09-18 22:57:32' used to limit the expense of validity query
00014 
00015     """
00016     def __init__(self, *args, **kwa ):
00017         dict.__init__(self, *args, **kwa )
00018         from DybPython import DBConf
00019         self.dbconf = DBConf( os.environ['DBCONF'] )
00020         self.kls = self._kls()  
00021 
00022     database = property(lambda self:self.dbconf.get('database'))
00023 
00024     # sensible defaults for easy test/mock usage
00025     dbi_loglevel = property(lambda self:self.get('dbi_loglevel',"INFO"))
00026     seed_target_tables = property(lambda self:self.get('seed_target_tables',False))
00027     DROP_TARGET_TABLES = property(lambda self:self.get('DROP_TARGET_TABLES',False))
00028     ALLOW_DROP_CREATE_TABLE = property(lambda self:self.get('ALLOW_DROP_CREATE_TABLE',False))
00029 
00030     def __repr__(self):
00031         return "Target %s:%s:%s " % ( self['target'], self.database, self['kls'] )
00032 
00033     def _kls(self):
00034         """
00035         :rtype DybDbi class:  returns DybDbi/genDbi class specified by ``kls`` key within this dict
00036         
00037         Also does DybDbi housekeeping:
00038 
00039         #. sets loglevel according to ``dbi_loglevel`` key
00040         #. creates target tables if both ``seed_target_tables`` key is True and ``ALLOW_DROP_CREATE_TABLE`` is True
00041         #. sets convenience kls attribute `kls.attnames` providing the list of attribute names : for dict consistency checking 
00042 
00043         .. warning:: As this is called by ctor, must set options impacting it on dict instanciation
00044 
00045         """
00046         from DybDbi import gDbi, Level          
00047         import DybDbi
00048 
00049         kls = getattr( DybDbi, self['kls'] )
00050         assert len(gDbi.cascader) == 1
00051         dbname = gDbi.cascader[0].dbname
00052         kls.attnames = kls.SpecKeys().aslist()
00053 
00054         gDbi.outputlevel = Level.From( self.dbi_loglevel )
00055 
00056         allow_dropcreate = self.ALLOW_DROP_CREATE_TABLE
00057         stt = self.seed_target_tables
00058         dtt = self.DROP_TARGET_TABLES
00059         sve = 'SUPERVISOR_ENABLED' in os.environ
00060         log.warn("target._kls stt %s dtt %s sve %s " % (stt,dtt,sve) )
00061 
00062         if sve and (stt or dtt):
00063             msg = "seeding/dropping target tables is not appropriate for longterm usage under supervisord : do manually with scr.py  "
00064             log.fatal(msg)
00065             raise Exception(msg)  
00066      
00067         if stt: 
00068             log.warn("seed_target_tables proceeding against : %r " % dict( self.dbconf, password="***") )
00069         elif dtt and allow_dropcreate: 
00070             if dbname == 'offline_db' or self.database == 'offline_db':
00071                 log.warn("seed_target_tables proceeding against offline_db for target %r " % dict( self.dbconf, password="***") )
00072             kls().CreateDatabaseTables( 0, kls.__name__[1:] )     
00073         else:
00074             log.warn("seed_target_tables/DROP_TARGET_TABLES is not allowed with target %r without ALLOW_DROP_CREATE_TABLE option " % dict( self.dbconf, password="***") )
00075         return kls
00076 
00077 
00078     def instance( self, **kwa ):
00079         """
00080         Might fail with TypeError if kwa cannot be coerced, eg from aggregate queries 
00081         returning None when zero samples
00082 
00083         If the attribute names are not expected for the target kls they are skipped.
00084         This will be the case for the system attributes `_date_time_min` `_date_time_max` 
00085 
00086         """
00087         d = dict((k,v) for k,v in kwa.items() if k in self.kls.attnames)
00088         if len(kwa) != len(d):
00089             log.debug("target.instance skipped some attributes %r in instance preparation " % list(set(kwa).difference(set(d))) )  
00090         return self.kls.Create(**d)
00091 
00092     def writer(self, sv , localstart=None, localend=None ):
00093         """
00094         Prepare DybDbi writer for target class, with contextrange/subsite appropriate for the source instance
00095 
00096         Use of non-default localstart and localend type is typically only used for
00097         aggregate_group_by quering where the instance datetimes such as `sv[0].date_time`
00098         do not correspond to the contextrange of the aggregate dict. 
00099 
00100         :param sv:  source vector instance that contains instances of an SA mapped class
00101         :param localstart: default of `None` corresponds to `sv[0].date_time`
00102         :param localend: default of `None` corresponds to `sv[-1].date_time`
00103         """
00104 
00105         start = localstart if localstart else sv[0].date_time
00106         end   = localend if localend else sv[-1].date_time
00107         return Target._wrt( sv[0].__class__ , self.kls , start, end ) 
00108 
00109     def _wrt( cls , skls, tkls , localstart, localend ):
00110         """
00111         Classmethod to prepare DybDbi writer for the target class with 
00112         with contextrange/subsite appropriate to the source class
00113 
00114         :param skls: SQLAlchemy source class which determines the appropriate contextrange to use for target class writer
00115         :param tkls: DybDbi target class for which the writer is to be prepared 
00116         :param localstart: local datetime which is converted to UTC TimeStamp for timestart of CR
00117         :param localend: local datetime which is converted to UTC TimeStamp for timeend of CR
00118 
00119         Assumptions made:
00120 
00121         #. source tablename encodes site, subsite (IF NOT THEN MUST GET FROM INSTANCE ?) 
00122         #. source date_time are naive localtimes 
00123 
00124         """
00125         wrt = tkls.Wrt()
00126         xtn = skls.xtn   
00127 
00128         from DybDbi import ContextRange, TimeStamp, Site, SimFlag, DetectorId
00129         log.debug( "_wrt localstart %s localend %s " % ( localstart.ctime(), localend.ctime() )) 
00130         timestart = TimeStamp.UTCfromLocalDatetime( localstart ) 
00131         timeend   = TimeStamp.UTCfromLocalDatetime( localend ) 
00132         cr = ContextRange( xtn.sitemask,  SimFlag.kData , timestart, timeend )
00133         wrt.ctx( contextrange=cr, dbno=0, versiondate=TimeStamp(0,0), subsite=xtn.subsite, task=0 ) 
00134         return wrt
00135     _wrt = classmethod( _wrt )
00136 
00137 
00138     def require_manual(self, msg ):
00139         """
00140         Require manual operation (ie running scr.py from commandline) 
00141         preventing usage of rare operations/options under supervisor control 
00142         """
00143         sve = 'SUPERVISOR_ENABLED' in os.environ
00144         if sve:
00145             log.fatal(msg)
00146             raise Exception(msg)  
00147         else:
00148             log.info("manual running detected")            
00149 
00150 
00151     def seed(self, srcs, scraper , dummy=False ):
00152         """
00153         This is invoked at scraper instanciation when the conditions are met:
00154 
00155         #. ``seed_target_tables`` is configured True
00156   
00157         Seed entries are written to the target table. The seed validity range is configured with
00158         the options: `seed_timestart` `seed_timeend` and formerly the payload entry 
00159         was specified by the `def seed()` method implemented in the scraper class.
00160 
00161         Attempts to perform seeding under supervisor raises an exception, 
00162         to enforce this restriction.
00163 
00164         When testing seeding start from scratch with eg::
00165 
00166              mysql> drop table DcsAdTemp, DcsAdTempVld ;    
00167              mysql> update LOCALSEQNO set LASTUSEDSEQNO=0 where TABLENAME='DcsAdTemp' ;  
00168 
00169        Changes from Oct 2012, 
00170 
00171         #. allow use against an existing table 
00172         #. remove table creation functionality is removed
00173         #. move to payloadless seeds (removing need for dummy payload instances) 
00174 
00175         Motivated by the need to add new sources that contribute 
00176         to an existing target which has already collected data
00177         eg for adding ADs to the DcsAdWpHv scraper.
00178 
00179         """
00180         self.require_manual("seed is not appropriate for longterm usage under supervisord : do seeding manually ")
00181         if not self['seed_target_tables']:
00182             log.warn("improper call to seed " )
00183             return
00184 
00185         localstart, localend = self['seed_timestart'], self['seed_timeend'] 
00186         kls = self.kls
00187         for src in srcs:
00188             tlv = self.lastvld(src.xtn)
00189             if tlv: 
00190                 log.info("tlv exists already : src %r tlv %r " % ( src,tlv.seqno))
00191             else:
00192                 log.warn("writing seed for src : %r " % (src) )
00193                 wrt = self._wrt( src, kls , localstart, localend  )
00194                 if dummy: 
00195                     inst = kls.Create( **scraper.seed(src) )
00196                     wrt.Write( inst )
00197                 seqno = wrt.Close()
00198                 assert seqno, "failed to seed_update for source %r " % src 
00199                 log.warn("wrote seed for src : %r with seqno %s  " % (src,seqno) )
00200 
00201 
00202     def lastvld( self, source ):
00203          """
00204          Last validity record in target database for context corresponding to `source` class.
00205          Query expense is restricted by the `timefloor`.  
00206          If `timefloor` is None a sequence of progressively 
00207          more expensive queries are performed to get the target last validty.
00208 
00209          :param source:  source context instance either an **xtn** of MockSource instance with subsite and sitemask attributes
00210          :param timefloor: time after which to look for validity entries in target database or None
00211 
00212          Note this is called only at scraper initialization, in order for the scraper to 
00213          find its time cursor.
00214 
00215          """
00216          timefloor = self.get('timefloor', None)
00217          if timefloor:
00218              log.debug("using timefloor %r " % timefloor )
00219              return self._lastvld( source, timefloor )
00220          else:
00221              log.warn("no timefloor ... inventing ")
00222              now = datetime.now()
00223              for days in (14,100,):
00224                  ts = now + timedelta(days=-days)  
00225                  tlv = self._lastvld( source , ts )
00226                  if tlv:
00227                      return tlv
00228              return self._lastvld(source, None ) 
00229 
00230     def _lastvld(self, source, timefloor ):
00231         """
00232         Obtain last target validity record for context corresponding to source class
00233 
00234         :param source: SA mapped class
00235 
00236         """ 
00237         from DybDbi import SimFlag
00238         subsite = source.subsite
00239         task = self.get('task', 0 )
00240         sqlcontext = "SiteMask & %s and SimMask & %s " % ( source.sitemask , SimFlag.kData )
00241         if timefloor:
00242             sqlcontext += " and TIMESTART > '%s' " % timefloor
00243         lastvld = self.kls.GetTableProxy().LastValidityRec( sqlcontext, subsite, task )
00244         return lastvld  
00245 
00246 
00247 
00248 
00249 if __name__ == '__main__':
00250     logging.basicConfig(level=logging.INFO)
00251 
00252     target = Target( kls='GDcsAdWpHv', target="tmp_offline_db", timefloor  = dt_("2010-01-01 00:00:00"), seed_target_tables=True )
00253     log.debug( "kls %r with attnames %s " % (target.kls, target.kls.attnames) )
00254 
00255     class MockSource(object):
00256         def __init__(self, sitemask_subsite):
00257              sitemask, subsite = sitemask_subsite.split()
00258              self.sitemask = int(sitemask)
00259              self.subsite  = int(subsite)
00260         __repr__ = lambda self:"MockSource (%s,%s) " % (self.sitemask, self.subsite)
00261 
00262     ## adhoc query to obtain all sources feeding the table already in low level way 
00263     from DybPython import DB
00264     db = DB(target.dbconf.sect)
00265     sql = "select distinct(concat(sitemask,\" \",subsite)) as ss from %sVld" % target.kls.__name__[1:]
00266     sss = map(lambda _:_['ss'],db(sql))
00267 
00268     srcs = map(MockSource,sss + ["4 4","2 2"])   ## extend sources for new ADs
00269     for s in srcs:   
00270         tlv = target.lastvld( s )
00271         timeend = tlv.contextrange.timeend if tlv else "-"           ## UTC TimeStamp
00272         log.info( "%r timeend %s  " % (s, timeend) )
00273 
00274         #limeend = timeend.UTCtoNaiveLocalDatetime     
00275         #log.info( "timeend %s  (localtime)" % limeend )
00276 
00277     target['seed_timestart'] = dt_("2010-01-01 00:00:00")
00278     target['seed_timeend']   = dt_("2011-01-01 00:00:00")
00279     target.seed( srcs, None , dummy=False ) 
00280 
00281 
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 09:50:03 for Scraper by doxygen 1.7.4