# $Id: History.py,v 1.8 2004/01/02 16:47:58 dieter Exp $ '''History extensions.''' from OFS.History import HystoryJar, Historian, Historical from struct import pack, unpack from cStringIO import StringIO from cPickle import Unpickler from ZODB.Connection import Connection from ZODB.POSException import POSKeyError from ZODB.TimeStamp import TimeStamp from sys import maxint # Code mostly stolen from "OFS.History" def historicalRevision(self,serial): '''return the state of *self* current at *serial* and set "_p_jar" up in a way that all subobject access has the same property.''' jar= HistoryJar(self._p_jar,serial) rev= jar[self._p_oid] parent= getattr(self,'aq_parent',None) of= getattr(rev,'__of__',None) if parent is not None and of is not None: rev= of(parent) return rev class HistoryJar(HystoryJar): '''a relaying connection selecting an object current at a serial (representing a time).''' def __init__(self,base,serial): HystoryJar.__init__(self,base) self._jar= base; self._serial= serial self._cache= {} def __getitem__(self,oid): '''return the state for *oid* current for *_serial*.''' # determine which history records are available # ATT: this information should be cached # even better, when ZODB would support loading an object valid at serial jar= self._jar; obj= jar[oid] rev= obj.__class__.__basicnew__() rev._p_jar= self rev._p_oid= oid self.setstate(rev) return rev def setstate(self,obj): serial= self._serial; jar= self._jar; oid= obj._p_oid S= jar._storage filter= lambda t, serial= serial: t['serial'] <= serial # this might need to become a picklable object, once ZEO supports 'filter'. if _hasSmartHistory(S): ts= S.history(oid,None,1, filter= filter, ) else: hf= HistoryFetcher(jar,oid,None,last=1, filter=filter) ts= hf.next() if not ts: raise POSKeyError, (oid,serial) t= ts[0] oserial= t['serial'] state= self.oldstate(obj,oserial) obj._p_serial= oserial obj.__setstate__(state) ht= TimeStamp(unpack('8s',serial)[0]).timeTime() try: obj._p_historical= ht except AttributeError: # this may not be possible, e.g. for __dict__ less instances (such as "BTrees" pass obj._p_changed= 0 # take over methods from "Connection", because we do not derive # from this class, we must use "im_func". oldstate= Connection.oldstate.im_func _persistent_load= Connection._persistent_load.im_func class Historian(Historian): '''deliver historical revisions.''' def __getitem__(self,key): self= self.aq_parent serial=_decodeSerial(key) # Note: we can not use the shortcut below, because # we would loose history information for subobjects # which may be newer. # if serial == self._p_serial: return self return historicalRevision(self,serial) class Historical(Historical): HistoricalRevisions= Historian() # provide information on whether or not this is a historical version def isHistorical(self): '''false (in fact 'None'), if *self* is not historical; otherwise, the time for which *self* has been requested.''' return getattr(self,'_p_historical',None) def getHistoricalSubpath(self): '''returns an URL subpath to reference a historical version corresponding to this time.''' ht= self.isHistorical() if ht is None: return '' return '/HistoricalRevisions/' + _encodeSerial(self._p_serial) # must override to use our "historicalRevision" def manage_historicalComparison(self, REQUEST, keys=[]): "Compare two selected revisions" if not keys: raise HistorySelectionError, ( "No historical revision was selected.

") if len(keys) > 2: raise HistorySelectionError, ( "Only two historical revision can be compared

") rev1=historicalRevision(self, _decodeSerial(keys[-1])) if len(keys)==2: rev2=historicalRevision(self, _decodeSerial(keys[0])) else: rev2=self return self.manage_historyCompare(rev1, rev2, REQUEST) class HistoryFetcher: '''auxiliary class to incrementally fetch history.''' _curr= 0 _inc= 1 _complete= 0 def __init__(self,jar,oid,version=None,first=0,last=None,filter=None): '''prepare fetching historical records for *oid* in *version*. Tries to find records satisfying *filter*. Records *first* through *last* are returned. ''' S= jar._storage history= S.history if _hasSmartHistory(S): self._smartHistory= history else: self._getRawHistory= history self._oid= oid self._version= version self._first= first self._last= last self._filter= _Filter(filter) def fetch(self): '''return the relevant history records.''' self.reset() n= self._last if n is None: n= maxint return self.next(n-self._first) def next(self,n=1): '''return up to n more relevant history records.''' l= [] while n > 0: x= self._next() if x is None: break l.append(x); n-= 1 return l def reset(self): self._bi= self._complete= self._filter.stopped= 0 if self._buffer is None or self._last is not None or self._curr == self._first: return # just reset the buffer index del self._buffer # delete the buffer as well _buffer= None def _next(self): if self._complete: return buffer= self._buffer if buffer is None or self._bi >= len(buffer): buffer= self._prefetch() if not buffer: self._complete= 1; return x= buffer[self._bi]; self._bi+= 1 return x def _prefetch(self): '''prefetch history information.''' buffer= self._buffer self._bi= 0 # if we know last, we fetch them in a single step last= self._last if last is not None: if buffer is not None: return # we already read everything self._buffer= buffer= self._fetchHistory(self._first,last) return buffer if buffer is None: # the first round curr= self._first else: curr= self._curr= self._curr + len(buffer) inc= self._inc buffer= self._fetchHistory(curr,curr+inc) self._inc<<= 1 self._buffer= buffer return buffer def _fetchHistory(self,first,last): filter= self._filter if filter.stopped: return filter.setIgnore(first) inc= last-first r= self._smartHistory(self._oid, self._version,inc,filter) if len(r) != inc: filter.stopped= 1 return r def _smartHistory(self,oid,version,size,filter): '''emulate a smart (i.e. filtered) history fetching for storages that provide only a simple one.''' r= [] # the result list if size <= 0: return r l= 0; h= size+filter._igncount # ATT: this is a very crude approximation -- we may do much better when we cache a bit. history= self._getRawHistory while 1: ts= history(oid,version,h) if len(ts) <= l: break for t in ts[l:h]: if filter(t): r.append(t) if len(r) >= size: return r # ATT: we may want to cache the remaining elements and how far we had to fetch ahead if len(ts) < h: break l= h; h<<= 1 filter.stopped= 1 return r class _Filter: '''A filter ignoring an initial segment of a sequence accepted by a *baseFilter*.''' stopped= None _igncount= 0 # ignore that many events def __init__(self,baseFilter): self._base= baseFilter def setIgnore(self,no): self._igncount= no def __call__(self,obj): '''check whether *obj* should pass.''' base= self._base fd= base is None or base(obj) if fd: if self._igncount: self._igncount-= 1; return return fd ########################################################################### ## HistoryProducer ## an incremental producer of history records class HistoryProducer: '''a history producer to be used together with an incremental merger. The elements are compared with respect to time and contain both the objects as well as the history record.''' def __init__(self,obj): self._obj= obj self._fetcher= HistoryFetcher(obj._p_jar,obj._p_oid) def pop(self): h= self._fetcher.next() if not h: return return _HistoryRecord(h[0],self._obj) class _HistoryRecord: def __init__(self,data,obj): self._data= data self._obj= obj def __cmp__(self,other): return cmp(other._data['time'],self._data['time']) ########################################################################### ## Auxiliaries def _encodeSerial(serial): '''encode a serial into a '.' separated sequence of ints.''' return '.'.join(map(str, unpack(">HHHH", serial))) def _decodeSerial(code): '''decode a serial encoded as a '.' separated sequence of ints.''' return apply(pack, ('>HHHH',)+tuple(map(int,code.split('.')))) ########################################################################### ## non standard ZODB extension handling try: from ZODB.BaseStorage import _hasSmartHistory except ImportError: def _hasSmartHistory(storage): return 0