[Zodb-checkins] CVS: ZODB3/ZEO - runzeo.py:1.15.6.2 StorageServer.py:1.98.4.1 CommitLog.py:1.4.72.1 ClientStorage.py:1.106.4.2 ClientCache.py:1.47.4.1

Jeremy Hylton jeremy at zope.com
Mon Sep 15 14:03:41 EDT 2003


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv13599/ZEO

Modified Files:
      Tag: Zope-2_7-branch
	runzeo.py StorageServer.py CommitLog.py ClientStorage.py 
	ClientCache.py 
Log Message:
Take two: Merge changes from ZODB3-3_2-branch to Zope-2_7-branch.

Please make all future changes on the Zope-2_7-branch instead.

The previous attempt used "cvs up -j ZODB3-3_2-branch", but appeared
to get only a small fraction of the changes.  This attempt is based on
copying a checkout of ZODB3-3_2-branch over top of a checkout of
Zope-2_7-branch.


=== ZODB3/ZEO/runzeo.py 1.15.6.1 => 1.15.6.2 ===
--- ZODB3/ZEO/runzeo.py:1.15.6.1	Mon Jul 21 12:37:12 2003
+++ ZODB3/ZEO/runzeo.py	Mon Sep 15 14:02:59 2003
@@ -196,7 +196,7 @@
             transaction_timeout=self.options.transaction_timeout,
             monitor_address=self.options.monitor_address,
             auth_protocol=self.options.auth_protocol,
-            auth_filename=self.options.auth_database,  # XXX option spelling
+            auth_database=self.options.auth_database,
             auth_realm=self.options.auth_realm)
 
     def loop_forever(self):


=== ZODB3/ZEO/StorageServer.py 1.98 => 1.98.4.1 ===
--- ZODB3/ZEO/StorageServer.py:1.98	Fri Jun 13 15:50:05 2003
+++ ZODB3/ZEO/StorageServer.py	Mon Sep 15 14:02:59 2003
@@ -82,6 +82,7 @@
         self.read_only = read_only
         self.locked = 0
         self.verifying = 0
+        self.store_failed = 0
         self.log_label = _label
         self.authenticated = 0
         self.auth_realm = auth_realm
@@ -367,6 +368,7 @@
         self.txnlog = CommitLog()
         self.tid = tid
         self.status = status
+        self.store_failed = 0
         self.stats.active_txns += 1
 
     def tpc_finish(self, id):
@@ -401,9 +403,9 @@
             self.timeout.end(self)
             self.stats.lock_time = None
             self.log("Transaction released storage lock")
-        # _handle_waiting() can start another transaction (by
-        # restarting a waiting one) so must be done last
-        self._handle_waiting()
+            # _handle_waiting() can start another transaction (by
+            # restarting a waiting one) so must be done last
+            self._handle_waiting()
 
     def _abort(self):
         # called when a connection is closed unexpectedly
@@ -471,12 +473,14 @@
         self.storage.tpc_begin(txn, tid, status)
 
     def _store(self, oid, serial, data, version):
+        err = None
         try:
             newserial = self.storage.store(oid, serial, data, version,
                                            self.transaction)
         except (SystemExit, KeyboardInterrupt):
             raise
         except Exception, err:
+            self.store_failed = 1
             if isinstance(err, ConflictError):
                 self.stats.conflicts += 1
             if not isinstance(err, TransactionError):
@@ -503,9 +507,15 @@
         if newserial == ResolvedSerial:
             self.stats.conflicts_resolved += 1
         self.serials.append((oid, newserial))
+        return err is None
 
     def _vote(self):
         self.client.serialnos(self.serials)
+        # If a store call failed, then return to the client immediately.
+        # The serialnos() call will deliver an exception that will be
+        # handled by the client in its tpc_vote() method.
+        if self.store_failed:
+            return
         return self.storage.tpc_vote(self.transaction)
 
     def _abortVersion(self, src):
@@ -554,11 +564,18 @@
 
     def _restart(self, delay=None):
         # Restart when the storage lock is available.
+        if self.txnlog.stores == 1:
+            template = "Preparing to commit transaction: %d object, %d bytes"
+        else:
+            template = "Preparing to commit transaction: %d objects, %d bytes"
+        self.log(template % (self.txnlog.stores, self.txnlog.size()),
+                 level=zLOG.BLATHER)
         self._tpc_begin(self.transaction, self.tid, self.status)
         loads, loader = self.txnlog.get_loader()
         for i in range(loads):
             # load oid, serial, data, version
-            self._store(*loader.load())
+            if not self._store(*loader.load()):
+                break
         resp = self._thunk()
         if delay is not None:
             delay.reply(resp)
@@ -612,7 +629,7 @@
                  transaction_timeout=None,
                  monitor_address=None,
                  auth_protocol=None,
-                 auth_filename=None,
+                 auth_database=None,
                  auth_realm=None):
         """StorageServer constructor.
 
@@ -659,7 +676,7 @@
         auth_protocol -- The name of the authentication protocol to use.
             Examples are "digest" and "srp".
             
-        auth_filename -- The name of the password database filename.
+        auth_database -- The name of the password database filename.
             It should be in a format compatible with the authentication
             protocol used; for instance, "sha" and "srp" require different
             formats.
@@ -685,7 +702,7 @@
             s._waiting = []
         self.read_only = read_only
         self.auth_protocol = auth_protocol
-        self.auth_filename = auth_filename
+        self.auth_database = auth_database
         self.auth_realm = auth_realm
         self.database = None
         if auth_protocol:
@@ -739,7 +756,7 @@
         # storages, avoiding the need to bloat each with a new authenticator
         # Database that would contain the same info, and also avoiding any
         # possibly synchronization issues between them.
-        self.database = db_class(self.auth_filename)
+        self.database = db_class(self.auth_database)
         if self.database.realm != self.auth_realm:
             raise ValueError("password database realm %r "
                              "does not match storage realm %r"


=== ZODB3/ZEO/CommitLog.py 1.4 => 1.4.72.1 ===
--- ZODB3/ZEO/CommitLog.py:1.4	Thu Aug 29 15:00:21 2002
+++ ZODB3/ZEO/CommitLog.py	Mon Sep 15 14:02:59 2003
@@ -31,6 +31,9 @@
         self.stores = 0
         self.read = 0
 
+    def size(self):
+        return self.file.tell()
+
     def store(self, oid, serial, data, version):
         self.pickler.dump((oid, serial, data, version))
         self.stores += 1


=== ZODB3/ZEO/ClientStorage.py 1.106.4.1 => 1.106.4.2 ===
--- ZODB3/ZEO/ClientStorage.py:1.106.4.1	Mon Jul 21 12:37:12 2003
+++ ZODB3/ZEO/ClientStorage.py	Mon Sep 15 14:02:59 2003
@@ -56,7 +56,7 @@
     the argument.
     """
     t = time.time()
-    t = TimeStamp(*time.gmtime(t)[:5] + (t % 60,))
+    t = apply(TimeStamp, (time.gmtime(t)[:5] + (t % 60,)))
     if prev_ts is not None:
         t = t.laterThan(prev_ts)
     return t
@@ -78,7 +78,7 @@
 
 MB = 1024**2
 
-class ClientStorage:
+class ClientStorage(object):
 
     """A Storage class that is a network client to a remote storage.
 
@@ -129,6 +129,7 @@
 
         client -- A name used to construct persistent cache filenames.
             Defaults to None, in which case the cache is not persistent.
+            See ClientCache for more info.
 
         debug -- Ignored.  This is present only for backwards
             compatibility with ZEO 1.
@@ -232,6 +233,11 @@
         self._username = username
         self._password = password
         self._realm = realm
+
+        # Flag tracking disconnections in the middle of a transaction.  This
+        # is reset in tpc_begin() and set in notifyDisconnected().
+        self._midtxn_disconnect = 0
+
         # _server_addr is used by sortKey()
         self._server_addr = None
         self._tfile = None
@@ -514,7 +520,7 @@
         if self._server_addr is None:
             raise ClientDisconnected
         else:
-            return self._server_addr
+            return '%s:%s' % (self._storage, self._server_addr)
 
     def verify_cache(self, server):
         """Internal routine called to verify the cache.
@@ -583,6 +589,7 @@
         self._connection = None
         self._ready.clear()
         self._server = disconnected_stub
+        self._midtxn_disconnect = 1
 
     def __len__(self):
         """Return the size of the storage."""
@@ -821,6 +828,7 @@
         if self._is_read_only:
             raise POSException.ReadOnlyError()
         self._tpc_cond.acquire()
+        self._midtxn_disconnect = 0
         while self._transaction is not None:
             # It is allowable for a client to call two tpc_begins in a
             # row with the same transaction, and the second of these
@@ -891,6 +899,12 @@
             return
         self._load_lock.acquire()
         try:
+            if self._midtxn_disconnect:
+                raise ClientDisconnected(
+                       'Calling tpc_finish() on a disconnected transaction')
+
+            tid = self._server.tpc_finish(self._serial)
+
             self._lock.acquire()  # for atomic processing of invalidations
             try:
                 self._update_cache()
@@ -898,8 +912,6 @@
                     f()
             finally:
                 self._lock.release()
-
-            tid = self._server.tpc_finish(self._serial)
             self._cache.setLastTid(tid)
 
             r = self._check_serials()


=== ZODB3/ZEO/ClientCache.py 1.47 => 1.47.4.1 ===
--- ZODB3/ZEO/ClientCache.py:1.47	Mon Jun 16 14:27:51 2003
+++ ZODB3/ZEO/ClientCache.py	Mon Sep 15 14:02:59 2003
@@ -367,8 +367,14 @@
                     data = read(dlen)
                     self._trace(0x2A, oid, version, h[19:], dlen)
                     if (p < 0) != self._current:
+                        # If the cache read we are copying has version info,
+                        # we need to pass the header to copytocurrent().
+                        if vlen:
+                            vheader = read(vlen + 4)
+                        else:
+                            vheader = None
                         self._copytocurrent(ap, oidlen, tlen, dlen, vlen, h,
-                                            oid, data)
+                                            oid, data, vheader)
                     return data, h[19:]
                 else:
                     self._trace(0x26, oid, version)
@@ -412,12 +418,13 @@
         """
         if self._pos + tlen > self._limit:
             return # Don't let this cause a cache flip
-        assert len(header) == 27
+        assert len(header) == 27, len(header)
         if header[8] == 'n':
             # Rewrite the header to drop the version data.
             # This shortens the record.
             tlen = 31 + oidlen + dlen
             vlen = 0
+            vheader = None
             # (oidlen:2, reserved:6, status:1, tlen:4,
             #  vlen:2, dlen:4, serial:8)
             header = header[:9] + pack(">IHI", tlen, vlen, dlen) + header[-8:]
@@ -446,7 +453,8 @@
             l.append(vdata)
             l.append(vserial)
         else:
-            assert None is vheader is vdata is vserial
+            assert None is vheader is vdata is vserial, (
+                vlen, vheader, vdata, vserial)
         l.append(header[9:13]) # copy of tlen
         g = self._f[self._current]
         g.seek(self._pos)




More information about the Zodb-checkins mailing list