[Checkins] SVN: ZODB/branches/jim-zeo-registerdb/src/ZEO/ Got fan-out tests to pass.

Jim Fulton jim at zope.com
Fri May 11 15:10:40 EDT 2007


Log message for revision 75684:
  Got fan-out tests to pass.
  

Changed:
  U   ZODB/branches/jim-zeo-registerdb/src/ZEO/ClientStorage.py
  U   ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py
  U   ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test
  U   ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py
  U   ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test

-=-
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/ClientStorage.py	2007-05-11 16:29:07 UTC (rev 75683)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/ClientStorage.py	2007-05-11 19:10:39 UTC (rev 75684)
@@ -1028,6 +1028,9 @@
         self._server.vote(id(txn))
         return self._check_serials()
 
+    def tpc_transaction(self):
+        return self._transaction
+
     def tpc_begin(self, txn, tid=None, status=' '):
         """Storage API: begin a transaction."""
         if self._is_read_only:

Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py	2007-05-11 16:29:07 UTC (rev 75683)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py	2007-05-11 19:10:39 UTC (rev 75684)
@@ -33,6 +33,7 @@
 
 import ZODB.serialize
 import ZEO.zrpc.error
+
 from ZEO import ClientStub
 from ZEO.CommitLog import CommitLog
 from ZEO.monitor import StorageStats, StatsServer
@@ -627,13 +628,13 @@
                 self.log(msg, logging.ERROR)
                 err = StorageServerError(msg)
             # The exception is reported back as newserial for this oid
-            newserial = err
+            newserial = [(oid, err)]
         else:
             if serial != "\0\0\0\0\0\0\0\0":
                 self.invalidated.append((oid, version))
 
-        if isinstance(newserial, str):
-            newserial = (oid, newserial)
+            if isinstance(newserial, str):
+                newserial = [(oid, newserial)]
 
         if newserial:
             for oid, s in newserial:
@@ -764,10 +765,18 @@
         self.references = ZODB.serialize.referencesf
 
     def invalidate(self, tid, oids, version=''):
+        storage_id = self.storage_id
         self.server.invalidate(
-            None, self.storage_id, tid,
+            None, storage_id, tid,
             [(oid, version) for oid in oids],
             )
+        for zeo_server in self.server.connections.get(storage_id, ())[:]:
+            try:
+                zeo_server.connection.poll()
+            except ZEO.zrpc.error.DisconnectedError:
+                pass
+            else:
+                break # We only need to pull one :)
 
     def invalidateCache(self):
         self.server._invalidateCache(self.storage_id)

Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test	2007-05-11 16:29:07 UTC (rev 75683)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test	2007-05-11 19:10:39 UTC (rev 75684)
@@ -59,6 +59,8 @@
     ...     def should_close(self):
     ...         print 'closed', self.obj.name
     ...         self.mgr.close_conn(self)
+    ...     def poll(self):
+    ...         pass
 
     >>> class ZEOStorage:
     ...     def __init__(self, server, name):

Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py	2007-05-11 16:29:07 UTC (rev 75683)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py	2007-05-11 19:10:39 UTC (rev 75684)
@@ -27,6 +27,7 @@
 import shutil
 
 import zope.testing.setupstack
+from zope.testing import doctest
 
 # ZODB test support
 import ZODB

Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test	2007-05-11 16:29:07 UTC (rev 75683)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test	2007-05-11 19:10:39 UTC (rev 75684)
@@ -30,6 +30,7 @@
     ...    '<zeoclient 1>\n  server %s\n</zeoclient>\n' % port0,
     ...     zconf2, port2)
 
+
 Now, let's create some client storages that connect to these:
 
     >>> import ZEO.ClientStorage
@@ -55,21 +56,27 @@
     >>> r2
     {}
 
+    >>> db2 = DB(cs2)
+    >>> tm2 = transaction.TransactionManager()
+    >>> c2 = db2.open(transaction_manager=tm2)
+    >>> r2 = c2.root()
+    >>> r2
+    {}
+
 If we update c1, we'll eventually see the change in c2:
 
-    >>> import persistent
-    >>> class P(persistent.Persistent):
-    ...     pass
+    >>> import persistent.mapping
 
-    >>> r1[1] = P()
+    >>> r1[1] = persistent.mapping.PersistentMapping()
     >>> r1[1].v = 1000
-    >>> r1[2] = P()
+    >>> r1[2] = persistent.mapping.PersistentMapping()
     >>> r1[2].v = -1000
+    >>> tm1.commit()
 
     >>> import time
     >>> for i in range(100):
-    ...     c2.sync()
-    ...     if r2:
+    ...     t = tm2.begin()
+    ...     if 1 in r2:
     ...         break
     ...     time.sleep(0.01)
     
@@ -82,26 +89,30 @@
 Now, let's see if we can break it. :)
 
     >>> def f():
-    ...     for in in range(100):
-    ...         r1[1] -= 1
-    ...         r1[2] += 1
+    ...     for i in range(100):
+    ...         r1[1].v -= 1
+    ...         r1[2].v += 1
     ...         tm1.commit()
     ...         time.sleep(0.01)
-    >>> import thread
-    >>> thread.start_new_thread(f, ())
+    >>> import threading
+    >>> thread = threading.Thread(target=f)
+    >>> thread.start()
 
     >>> for i in range(1000):
-    ...     c2.sync()
-    ...     if c2[1] + c2[2]:
-    ...         print 'oops', c2[1], c2[2]
-    ...     if not c2[1]:
-    ...         break
+    ...     t = tm2.begin()
+    ...     if r2[1].v + r2[2].v:
+    ...         print 'oops', r2[1], r2[2]
+    ...     if r1[1].v == 900:
+    ...         break # we caught up
     ...     time.sleep(0.01)
+
+    >>> thread.join()
     
+    
 If we shutdown and restart the source server, the variables will be
 invalidated:
 
-    >>> forker.shutdown_zeo_server(adminaddr0)
+    >>> ZEO.tests.forker.shutdown_zeo_server(adminaddr0)
     >>> zport0, adminaddr0, pid0, path0 = ZEO.tests.forker.start_zeo_server(
     ...    '<filestorage 1>\n  path fs\n</filestorage>\n', zconf0, port0)
     



More information about the Checkins mailing list