[Zodb-checkins] SVN: ZODB/branches/gocept-iteration/s Implement copyTransactionsFrom() and restore() for ClientStorages. Remove

Christian Theune ct at gocept.com
Tue Feb 26 01:31:51 EST 2008


Log message for revision 84254:
  Implement copyTransactionsFrom() and restore() for ClientStorages. Remove
  ThreadedAsync reference.
  

Changed:
  U   ZODB/branches/gocept-iteration/setup.py
  U   ZODB/branches/gocept-iteration/src/ZEO/ClientStorage.py
  U   ZODB/branches/gocept-iteration/src/ZEO/CommitLog.py
  U   ZODB/branches/gocept-iteration/src/ZEO/ServerStub.py
  U   ZODB/branches/gocept-iteration/src/ZEO/StorageServer.py
  U   ZODB/branches/gocept-iteration/src/ZEO/tests/testZEO.py
  U   ZODB/branches/gocept-iteration/src/ZODB/BaseStorage.py
  U   ZODB/branches/gocept-iteration/src/ZODB/interfaces.py
  U   ZODB/branches/gocept-iteration/src/ZODB/tests/IteratorStorage.py

-=-
Modified: ZODB/branches/gocept-iteration/setup.py
===================================================================
--- ZODB/branches/gocept-iteration/setup.py	2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/setup.py	2008-02-26 06:31:49 UTC (rev 84254)
@@ -149,7 +149,6 @@
             "ZODB", "ZODB.FileStorage", "ZODB.tests",
                     "ZODB.scripts",
             "persistent", "persistent.tests",
-            "ThreadedAsync",
             "ZopeUndo", "ZopeUndo.tests",
             ]
 

Modified: ZODB/branches/gocept-iteration/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/ClientStorage.py	2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZEO/ClientStorage.py	2008-02-26 06:31:49 UTC (rev 84254)
@@ -1174,6 +1174,25 @@
             return []
         return self._server.undoLog(first, last)
 
+    # Recovery support
+
+    def copyTransactionsFrom(self, other, verbose=0):
+        """Copy transactions from another storage.
+
+        This is typically used for converting data from one storage to
+        another.  `other` must have an .iterator() method.
+        """
+        ZODB.BaseStorage.copy(other, self, verbose)
+
+    def restore(self, oid, serial, data, version, prev_txn, transaction):
+        """Write data already committed in a separate database."""
+        assert not version
+        self._check_trans(transaction)
+        self._server.restorea(oid, serial, data, prev_txn, id(transaction))
+        # XXX I'm not updating the transaction buffer here because I can't
+        # exactly predict how invalidation should work with restore. :/
+        return self._check_serials()
+
     # Below are methods invoked by the StorageServer
 
     def serialnos(self, args):

Modified: ZODB/branches/gocept-iteration/src/ZEO/CommitLog.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/CommitLog.py	2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZEO/CommitLog.py	2008-02-26 06:31:49 UTC (rev 84254)
@@ -35,9 +35,13 @@
         return self.file.tell()
 
     def store(self, oid, serial, data):
-        self.pickler.dump((oid, serial, data))
+        self.pickler.dump(('store', oid, serial, data))
         self.stores += 1
 
+    def restore(self, oid, serial, data, prev_txn):
+        self.pickler.dump(('restore', oid, serial, data, prev_txn))
+        self.stores += 1
+
     def get_loader(self):
         self.read = 1
         self.file.seek(0)

Modified: ZODB/branches/gocept-iteration/src/ZEO/ServerStub.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/ServerStub.py	2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZEO/ServerStub.py	2008-02-26 06:31:49 UTC (rev 84254)
@@ -209,6 +209,10 @@
     def storea(self, oid, serial, data, id):
         self.rpc.callAsync('storea', oid, serial, data, '', id)
 
+    def restorea(self, oid, serial, data, prev_txn, id):
+        self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id)
+
+
     def storeBlob(self, oid, serial, data, blobfilename, txn):
 
         # Store a blob to the server.  We don't want to real all of

Modified: ZODB/branches/gocept-iteration/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/StorageServer.py	2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZEO/StorageServer.py	2008-02-26 06:31:49 UTC (rev 84254)
@@ -479,6 +479,11 @@
         self.stats.stores += 1
         self.txnlog.store(oid, serial, data)
 
+    def restorea(self, oid, serial, data, prev_txn, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        self.stats.stores += 1
+        self.txnlog.restore(oid, serial, data, prev_txn)
+
     def storeBlobStart(self):
         assert self.blob_tempfile is None
         self.blob_tempfile = tempfile.mkstemp(
@@ -577,6 +582,33 @@
 
         return err is None
 
+    def _restore(self, oid, serial, data, prev_txn):
+        err = None
+        try:
+            self.storage.restore(oid, serial, data, '', prev_txn, self.transaction)
+        except (SystemExit, KeyboardInterrupt):
+            raise
+        except Exception, err:
+            self.store_failed = 1
+            if not isinstance(err, TransactionError):
+                # Unexpected errors are logged and passed to the client
+                self.log("store error: %s, %s" % sys.exc_info()[:2],
+                         logging.ERROR, exc_info=True)
+            # Try to pickle the exception.  If it can't be pickled,
+            # the RPC response would fail, so use something else.
+            pickler = cPickle.Pickler()
+            pickler.fast = 1
+            try:
+                pickler.dump(err, 1)
+            except:
+                msg = "Couldn't pickle storage exception: %s" % repr(err)
+                self.log(msg, logging.ERROR)
+                err = StorageServerError(msg)
+            # The exception is reported back as newserial for this oid
+            self.serials.append((oid, err))
+
+        return err is None
+
     def _vote(self):
         if not self.store_failed:
             # Only call tpc_vote of no store call failed, otherwise
@@ -629,8 +661,16 @@
         self._tpc_begin(self.transaction, self.tid, self.status)
         loads, loader = self.txnlog.get_loader()
         for i in range(loads):
-            # load oid, serial, data, version
-            if not self._store(*loader.load()):
+            store = loader.load()
+            store_type = store[0]
+            store_args = store[1:]
+
+            if store_type == 'store':
+                do_store = self._store
+            elif store_type == 'restore':
+                do_store = self._restore
+
+            if not do_store(*store_args):
                 break
 
         # Blob support

Modified: ZODB/branches/gocept-iteration/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/tests/testZEO.py	2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZEO/tests/testZEO.py	2008-02-26 06:31:49 UTC (rev 84254)
@@ -41,7 +41,7 @@
 from ZODB.tests import StorageTestBase, BasicStorage,  \
      TransactionalUndoStorage,  \
      PackableStorage, Synchronization, ConflictResolution, RevisionStorage, \
-     MTStorage, ReadOnlyStorage, IteratorStorage
+     MTStorage, ReadOnlyStorage, IteratorStorage, RecoveryStorage
 
 from ZODB.tests.testDemoStorage import DemoStorageWrappedBase
 
@@ -254,6 +254,70 @@
     ):
     """Extend GenericTests with tests that MappingStorage can't pass."""
 
+class FileStorageRecoveryTests(StorageTestBase.StorageTestBase,
+                               RecoveryStorage.RecoveryStorage):
+
+    level = 2
+
+    def setUp(self):
+        self._storage = ZODB.FileStorage.FileStorage("Source.fs", create=True)
+        self._dst = ZODB.FileStorage.FileStorage("Dest.fs", create=True)
+
+    def getConfig(self):
+        filename = self.__fs_base = tempfile.mktemp()
+        return """\
+        <filestorage 1>
+        path %s
+        </filestorage>
+        """ % filename
+
+    def _new_storage(self):
+        port = get_port()
+        zconf = forker.ZEOConfig(('', port))
+        zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(),
+                                                              zconf, port)
+        blob_cache_dir = tempfile.mkdtemp()
+
+        self._pids.append(pid)
+        self._servers.append(adminaddr)
+        self._conf_paths.append(path)
+        self.blob_cache_dirs.append(blob_cache_dir)
+
+        storage = ClientStorage(
+            zport, '1', cache_size=20000000,
+            min_disconnect_poll=0.5, wait=1,
+            wait_timeout=60, blob_dir=blob_cache_dir)
+        storage.registerDB(DummyDB())
+        return storage
+
+    def setUp(self):
+        self._pids = []
+        self._servers = []
+        self._conf_paths = []
+        self.blob_cache_dirs = []
+
+        self._storage = self._new_storage()
+        self._dst = self._new_storage()
+
+    def tearDown(self):
+        self._storage.close()
+        self._dst.close()
+
+        for p in self._conf_paths:
+            os.remove(p)
+        for p in self.blob_cache_dirs:
+            ZODB.blob.remove_committed_dir(p)
+        for server in self._servers:
+            forker.shutdown_zeo_server(server)
+        if hasattr(os, 'waitpid'):
+            # Not in Windows Python until 2.3
+            for pid in self._pids:
+                os.waitpid(pid, 0)
+
+    def new_dest(self):
+        return self._new_storage()
+
+
 class FileStorageTests(FullGenericTests):
     """Test ZEO backed by a FileStorage."""
     level = 2
@@ -889,7 +953,8 @@
     """
 
 
-test_classes = [FileStorageTests, MappingStorageTests, DemoStorageTests,
+test_classes = [FileStorageTests, FileStorageRecoveryTests,
+                MappingStorageTests, DemoStorageTests,
                 BlobAdaptedFileStorageTests, BlobWritableCacheTests]
 
 def test_suite():

Modified: ZODB/branches/gocept-iteration/src/ZODB/BaseStorage.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZODB/BaseStorage.py	2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZODB/BaseStorage.py	2008-02-26 06:31:49 UTC (rev 84254)
@@ -339,6 +339,7 @@
             if verbose:
                 print oid_repr(oid), r.version, len(r.data)
             if restoring:
+                print r.data_txn
                 dest.restore(oid, r.tid, r.data, r.version,
                              r.data_txn, transaction)
             else:
@@ -349,7 +350,10 @@
         dest.tpc_vote(transaction)
         dest.tpc_finish(transaction)
 
-    fiter.close()
+    if hasattr(fiter, 'close'):
+        # XXX close is not part of the iterator interface but FileStorage's
+        # iterator has this method to get rid of a file handle.
+        fiter.close()
 
 
 class TransactionRecord(object):

Modified: ZODB/branches/gocept-iteration/src/ZODB/interfaces.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZODB/interfaces.py	2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZODB/interfaces.py	2008-02-26 06:31:49 UTC (rev 84254)
@@ -744,7 +744,7 @@
         #   failed to take into account records after the pack time.
         
 
-    def restore(oid, serial, data, prev_txn, transaction):
+    def restore(oid, serial, data, version, prev_txn, transaction):
         """Write data already committed in a separate database
 
         The restore method is used when copying data from one database

Modified: ZODB/branches/gocept-iteration/src/ZODB/tests/IteratorStorage.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZODB/tests/IteratorStorage.py	2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZODB/tests/IteratorStorage.py	2008-02-26 06:31:49 UTC (rev 84254)
@@ -225,5 +225,7 @@
         # they were the same length
         self.assertRaises(StopIteration, iter1.next)
         self.assertRaises(StopIteration, iter2.next)
-        iter1.close()
-        iter2.close()
+        if hasattr(iter1, 'close'):
+            iter1.close()
+        if hasattr(iter2, 'close'):
+            iter2.close()



More information about the Zodb-checkins mailing list