[ZODB-Dev] ZEO, FileStorage, and Undo

Jeremy Hylton jeremy@alum.mit.edu
Thu, 22 Aug 2002 12:13:02 -0400


Here's a patch.

Jeremy

Index: ZEO/StorageServer.py
===================================================================
RCS file: /cvs-repository/ZODB3/ZEO/StorageServer.py,v
retrieving revision 1.44
diff -c -r1.44 StorageServer.py
*** ZEO/StorageServer.py	12 Aug 2002 19:42:40 -0000	1.44
--- ZEO/StorageServer.py	22 Aug 2002 16:14:06 -0000
***************
*** 281,286 ****
--- 281,302 ----
              n = 1
          return [self.__storage.new_oid() for i in range(n)]
  
+     # Depending on the search criteria, undoLog() and undoInfo() can
+     # spend a very long time searching through the storage for
+     # undoable transactions.  To avoid locking up the server, run this
+     # in a separate thread just like pack().
+ 
+     def undoInfo(self, first, last, spec):
+         t = SlowMethodThread(self.__storage.undoInfo, first, last, spec)
+         t.start()
+         return t.delay
+ 
+     def undoLog(self, first, last):
+         delay = MTDelay()
+         t = SlowMethodThread(self.__storage.UndoLog, first, last)
+         t.start()
+         return t.delay
+ 
      def undo(self, transaction_id):
          oids = self.__storage.undo(transaction_id)
          if oids:
***************
*** 582,584 ****
--- 598,624 ----
              new_strategy.store(oid, serial, data, version)
          meth = getattr(new_strategy, self.name)
          return meth(*self.args)
+ 
+ class SlowMethodThread(threading. Thread):
+     """Thread to run potentially slow storage methods.
+ 
+     Clients can use the t.delay attribute to access the MTDelay object
+     used to send a zrpc response at the right time.
+     """
+ 
+     # Some storage methods can take a long time to complete.  If we
+     # run these methods via a standard asyncore read handler, they
+     # will block all other server activity until they complete.  To
+     # avoid blocking, we spawn a separate thread, return an MTDelay()
+     # object, and have the thread reply() when it finishes.
+ 
+     def __init__(self, method, *args):
+         threading.Thread.__init__(self)
+         self._method = method
+         self._args = args
+         self.delay = MTDelay()
+ 
+     def run(self):
+         # XXX What happens if this raises an exception?
+         result = self._method(*self._args)
+         self._delay.reply(result)
Index: ZODB/FileStorage.py
===================================================================
RCS file: /cvs-repository/ZODB3/ZODB/FileStorage.py,v
retrieving revision 1.95
diff -c -r1.95 FileStorage.py
*** ZODB/FileStorage.py	14 Aug 2002 22:07:09 -0000	1.95
--- ZODB/FileStorage.py	22 Aug 2002 16:14:06 -0000
***************
*** 1070,1117 ****
      def undoLog(self, first=0, last=-20, filter=None):
          if last < 0:
              last = first - last + 1
!         self._lock_acquire()
!         try:
!             if self._packt is None:
!                 raise UndoError(
!                     'Undo is currently disabled for database maintenance.<p>')
!             pos = self._pos
!             r = []
!             i = 0
!             # BAW: Why 39 please?  This makes no sense (see also below).
!             while i < last and pos > 39:
!                 self._file.seek(pos - 8)
!                 pos = pos - U64(self._file.read(8)) - 8
!                 self._file.seek(pos)
!                 h = self._file.read(TRANS_HDR_LEN)
!                 tid, tl, status, ul, dl, el = struct.unpack(">8s8scHHH", h)
!                 if tid < self._packt or status == 'p':
                      break
!                 if status != ' ':
!                     continue
!                 d = u = ''
!                 if ul:
!                     u = self._file.read(ul)
!                 if dl:
!                     d = self._file.read(dl)
!                 e = {}
!                 if el:
!                     try:
!                         e = loads(read(el))
!                     except:
!                         pass
!                 d = {'id': base64.encodestring(tid).rstrip(),
!                      'time': TimeStamp(tid).timeTime(),
!                      'user_name': u,
!                      'description': d}
!                 d.update(e)
!                 if filter is None or filter(d):
!                     if i >= first:
!                         r.append(d)
!                     i += 1
!             return r
!         finally:
!             self._lock_release()
  
      def transactionalUndo(self, transaction_id, transaction):
          """Undo a transaction, given by transaction_id.
--- 1070,1124 ----
      def undoLog(self, first=0, last=-20, filter=None):
          if last < 0:
              last = first - last + 1
!         if self._packt is None:
!             raise UndoError(
!                 'Undo is currently disabled for database maintenance.<p>')
!         pos = self._pos
!         r = []
!         i = 0
!         # BAW: Why 39 please?  This makes no sense (see also below).
!         while i < last and pos > 39:
!             self._lock_acquire()
!             try:
!                 i, pos = self.undoFindNext(pos, r, first, i, filter)
!                 if i is None:
                      break
!             finally:
!                 self._lock_release()
!         return r
! 
!     def undoFindNext(self, pos, result, first, i, filter):
!         self._file.seek(pos - 8)
!         pos = pos - U64(self._file.read(8)) - 8
!         self._file.seek(pos)
!         h = self._file.read(TRANS_HDR_LEN)
!         tid, tl, status, ul, dl, el = struct.unpack(">8s8scHHH", h)
!         if tid < self._packt or status == 'p':
!             return None, None
!         if status != ' ':
!             return i, pos
!         d = u = ''
!         if ul:
!             u = self._file.read(ul)
!         if dl:
!             d = self._file.read(dl)
!         e = {}
!         if el:
!             try:
!                 e = loads(self._file.read(el))
!             except:
!                 pass
!         d = {'id': base64.encodestring(tid).rstrip(),
!              'time': TimeStamp(tid).timeTime(),
!              'user_name': u,
!              'description': d}
!         d.update(e)
!         if filter is None or filter(d):
!             if i >= first:
!                 result.append(d)
!             i += 1
!         return i, pos
!         
  
      def transactionalUndo(self, transaction_id, transaction):
          """Undo a transaction, given by transaction_id.