[Checkins] SVN: ZODB/trunk/src/ZODB/tests/util.py Removed ConflictResolvingMappingStorage. The one test that uses it can

Jim Fulton jim at zope.com
Sat Oct 25 20:36:15 EDT 2008


Log message for revision 92557:
  Removed ConflictResolvingMappingStorage. The one test that uses it can
  as easily use FileStorage.
  
  Removed commit. It should be gotten from transaction.
  
  Removed some test set-up and tear-down facilities that can now be found
  in zope.testing.setupstack.
  

Changed:
  U   ZODB/trunk/src/ZODB/tests/util.py

-=-
Modified: ZODB/trunk/src/ZODB/tests/util.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/util.py	2008-10-26 00:36:12 UTC (rev 92556)
+++ ZODB/trunk/src/ZODB/tests/util.py	2008-10-26 00:36:14 UTC (rev 92557)
@@ -16,73 +16,10 @@
 $Id$
 """
 
-import os
-import shutil
-import sys
-import tempfile
 import time
-
 import persistent
-import transaction
-from ZODB.MappingStorage import MappingStorage
-from ZODB.ConflictResolution import ConflictResolvingStorage
-from ZODB.DB import DB as _DB
-from ZODB import POSException
+from ZODB.MappingStorage import DB
 
-def DB(name='Test', **dbargs):
-    return _DB(MappingStorage(name), **dbargs)
-
-class ConflictResolvingMappingStorage(
-    MappingStorage, ConflictResolvingStorage):
-
-    def __init__(self, name='ConflictResolvingMappingStorage'):
-        MappingStorage.__init__(self, name)
-        self._old = {}
-
-    def loadSerial(self, oid, serial):
-        self._lock_acquire()
-        try:
-            old_info = self._old[oid]
-            try:
-                return old_info[serial]
-            except KeyError:
-                raise POSException.POSKeyError(oid)
-        finally:
-            self._lock_release()
-
-    def store(self, oid, serial, data, version, transaction):
-        if transaction is not self._transaction:
-            raise POSException.StorageTransactionError(self, transaction)
-
-        if version:
-            raise POSException.Unsupported("Versions aren't supported")
-
-        self._lock_acquire()
-        try:
-            if oid in self._index:
-                oserial = self._index[oid][:8]
-                if serial != oserial:
-                    rdata = self.tryToResolveConflict(
-                        oid, oserial, serial, data)
-                    if rdata is None:
-                        raise POSException.ConflictError(
-                            oid=oid, serials=(oserial, serial), data=data)
-                    else:
-                        data = rdata
-            self._tindex[oid] = self._tid + data
-        finally:
-            self._lock_release()
-        return self._tid
-
-    def _finish(self, tid, user, desc, ext):
-        self._index.update(self._tindex)
-        self._ltid = self._tid
-        for oid, record in self._tindex.items():
-            self._old.setdefault(oid, {})[self._tid] = record[8:]
-
-def commit():
-    transaction.commit()
-
 def pack(db):
     db.pack(time.time()+1)
 
@@ -93,33 +30,3 @@
 
     def __repr__(self):
         return 'P(%s)' % self.name
-
-def setUp(test):
-    test.globs['__teardown_stack__'] = []
-    tmp = tempfile.mkdtemp('test')
-    registerTearDown(test, lambda : rmtree(tmp))
-    here = os.getcwd()
-    registerTearDown(test, lambda : os.chdir(here))
-    os.chdir(tmp)
-
-if sys.platform == 'win32':    
-    # On windows, we can't remove a directory of there are files upen.
-    # We may need to wait a while for processes to exit.
-    def rmtree(path):
-        for i in range(1000):
-            try:
-                shutil.rmtree(path)
-            except OSError:
-                time.sleep(0.01)
-            else:
-                break
-
-else:
-    rmtree = shutil.rmtree
-            
-def registerTearDown(test, func):
-    test.globs['__teardown_stack__'].append(func)    
-    
-def tearDown(test):
-    for f in test.globs['__teardown_stack__']:
-        f()



More information about the Checkins mailing list