/search.css" rel="stylesheet" type="text/css"/> /search.js">
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

In This Package:

Public Member Functions | Public Attributes
Scraper::base::averager::Averager Class Reference
Inheritance diagram for Scraper::base::averager::Averager:
Inheritance graph
[legend]
Collaboration diagram for Scraper::base::averager::Averager:
Collaboration graph
[legend]

List of all members.

Public Member Functions

def changed
def propagate
def __call__

Public Attributes

 maxiter

Detailed Description

Base class holding common averager features, such as the scrape logic
which assumes:

#. source instances correspond to fixed time measurement *snapshots*
#. target instances represent source measurements over time ranges 
#. 2 source instances are required to form one target instance, the target validity is derived from the datetimes of two source instances

Initialisation in `Propagator` superclass

Definition at line 10 of file averager.py.


Member Function Documentation

def Scraper::base::averager::Averager::changed (   self,
  sv 
)
Override in subclasses to return if a significant 
change in source instances is observed.  This together
with age checks is used to decide is the propagate method is called.

:param sv:  source vector containing two source instances to interrogate for changes

Definition at line 21 of file averager.py.

00022                           :
00023         """
00024         Override in subclasses to return if a significant 
00025         change in source instances is observed.  This together
00026         with age checks is used to decide is the propagate method is called.
00027 
00028         :param sv:  source vector containing two source instances to interrogate for changes
00029         """
00030         return True

def Scraper::base::averager::Averager::propagate (   self,
  sv 
)
Override this method in subclasses to yield one or more 
write ready target dicts derived from the `sv[-1]` source instance or `sv[-1].aggd` aggregate dict 

:param sv:  source vector containing two source instances to propagate to one target write  

Definition at line 31 of file averager.py.

00032                             :
00033         """
00034         Override this method in subclasses to yield one or more 
00035         write ready target dicts derived from the `sv[-1]` source instance or `sv[-1].aggd` aggregate dict 
00036 
00037         :param sv:  source vector containing two source instances to propagate to one target write  
00038         """
00039         yield {}

def Scraper::base::averager::Averager::__call__ (   self)
Note the differences between averager and scraper:

#. single pass rather than infinite loop is typically used

#. `group_by` aggregation is used causing potentially many aggregated dicts to be propagated

#. in an averager the writer must be instanciated within the propagate loop as 
   each aggregate dict corresponds to a different time range.

#. time bins of typically 1 day are externally applied, via setting tcursor to a 2 element list  

Definition at line 40 of file averager.py.

00041                       :
00042         """
00043         Note the differences between averager and scraper:
00044 
00045         #. single pass rather than infinite loop is typically used
00046 
00047         #. `group_by` aggregation is used causing potentially many aggregated dicts to be propagated
00048 
00049         #. in an averager the writer must be instanciated within the propagate loop as 
00050            each aggregate dict corresponds to a different time range.
00051 
00052         #. time bins of typically 1 day are externally applied, via setting tcursor to a 2 element list  
00053 
00054         """
00055         i,w = 0,0
00056         while i<self.maxiter or self.maxiter==0:
00057             i += 1
00058             log.debug("i %s " % i )
00059             for sv in self:
00060                 log.info("sv.source.xtn %r", sv.source.xtn ) 
00061                 floor = sv.tcursor     ## tcursor is initialized based on target last validity endtime
00062 
00063                 ## assumes single shot running
00064                 if not getattr(sv,'territory',None) and self.aggregate_group_by_territory:
00065                     sv.territory = Territory( sv.source , by=self.aggregate_group_by_territory , suby=self.aggregate_group_by, att=self.aggregate_group_by_att , floor=floor  )
00066                     sv.territory.dump()
00067                     if len(sv.territory) == 0:
00068                         log.warn("no bins : insufficient time range for period\n%s\n" % sv.territory )
00069                     else:
00070                         log.info("looping over %s territory bins performing grouped aggregate queries in each bin  " % len(sv.territory) )
00071 
00072                 ## this assumes independence of sources, maybe need to align territorial time ranges 
00073                 ## so clients have option of mangling them together 
00074                 nbin = len(sv.territory)
00075                 for ibin,bin in enumerate(sv.territory): 
00076                     sv.tcursor = bin
00077                     log.debug( "averager ibin %s bin %s " % ( ibin, map(lambda _:_.ctime(),bin)) )
00078                     proceed = sv()  
00079                     if not proceed:
00080                         continue 
00081                     writes = []
00082                     for td in self.propagate( sv ):           ## iterate over "yield" supplier of target dicts   
00083 
00084                         localstart = td.get( "_%s_min" % self.aggregate_group_by_att , None)  # typically date_time
00085                         localend   = td.get( "_%s_max" % self.aggregate_group_by_att , None )
00086                         log.debug( "averager within propagate loop writer for range %s %s " % ( localstart, localend ))
00087                         wrt = self.target.writer( sv , localstart=localstart, localend=localend ) 
00088 
00089                         tdi = self.target.instance(**td)   
00090                         log.debug("tdi %r " % tdi )
00091                         wrt.Write(tdi)    
00092 
00093                         seqno = wrt.Close() ## hits target DB here
00094                         if seqno:
00095                             log.debug("write %s succeeded %s seqno %s  " % ( w, sv, seqno ) )
00096                             writes.append(dict(seqno=seqno,timestart=wrt.ctx.contextrange.timestart,timeend=wrt.ctx.contextrange.timeend,localstart=localstart,localend=localend))
00097                             w += 1
00098                         else:
00099                             log.fatal("writing failed %s " %  sv )
00100                             assert 0
00101                     pass
00102                     if len(writes)>0:
00103                         starts = map(lambda _:_['localstart'], writes)    
00104                         ends   = map(lambda _:_['localend'], writes)
00105                         seqnos = map(lambda _:_['seqno'], writes)
00106                         log.debug("propagated %s with seqnos %s:%s time range %s %s  " % ( len(writes), min(seqnos),max(seqnos), min(starts), max(ends) ))
00107                     else:
00108                         log.debug("no writes for bin %s %s " % ( ibin, map(lambda _:_.ctime(),bin)))
00109                     pass 
00110                     # end bin
00111                 pass
00112                 # end sv  
00113             pass 
00114             # end while
00115             self.handle_signal()
00116         pass


Member Data Documentation

Definition at line 52 of file averager.py.


The documentation for this class was generated from the following file:
| Classes | Job Modules | Data Objects | Services | Algorithms | Tools | Packages | Directories | Tracs |

Generated on Fri May 16 2014 09:50:03 for Scraper by doxygen 1.7.4