[Checkins] SVN: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py Fixed a bug in file pool: it didn't properly handle multiple write

Jim Fulton jim at zope.com
Fri Feb 5 16:55:34 EST 2010


Log message for revision 108805:
  Fixed a bug in file pool: it didn't properly handle multiple write
  locks.
  
  In fixing, also made it work with with.
  

Changed:
  U   ZODB/trunk/src/ZODB/FileStorage/FileStorage.py

-=-
Modified: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2010-02-05 21:54:59 UTC (rev 108804)
+++ ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2010-02-05 21:55:34 UTC (rev 108805)
@@ -14,6 +14,8 @@
 """Storage implementation using a log written to a single file.
 """
 
+from __future__ import with_statement
+
 from cPickle import Pickler, Unpickler, loads
 from persistent.TimeStamp import TimeStamp
 from struct import pack, unpack
@@ -32,6 +34,7 @@
 
 import base64
 import BTrees.OOBTree
+import contextlib
 import errno
 import logging
 import os
@@ -409,8 +412,7 @@
         """Return pickle data and serial number."""
         assert not version
 
-        _file = self._files.get()
-        try:
+        with self._files.get() as _file:
             pos = self._lookup_pos(oid)
             h = self._read_data_header(pos, oid, _file)
             if h.plen:
@@ -423,8 +425,6 @@
                 return data, h.tid
             else:
                 raise POSKeyError(oid)
-        finally:
-            self._files.put(_file)
 
     def loadSerial(self, oid, serial):
         self._lock_acquire()
@@ -445,8 +445,7 @@
             self._lock_release()
 
     def loadBefore(self, oid, tid):
-        _file = self._files.get()
-        try:
+        with self._files.get() as _file:
             pos = self._lookup_pos(oid)
             end_tid = None
             while True:
@@ -464,8 +463,6 @@
                 return data, h.tid, end_tid
             else:
                 return _file.read(h.plen), h.tid, end_tid
-        finally:
-            self._files.put(_file)
 
     def store(self, oid, oldserial, data, version, transaction):
         if self._is_read_only:
@@ -718,12 +715,8 @@
             self._lock_release()
 
     def tpc_finish(self, transaction, f=None):
-
-        # Get write lock
-        self._files.write_lock()
-        try:
-            self._lock_acquire()
-            try:
+        with self._files.write_lock():
+            with self._lock:
                 if transaction is not self._transaction:
                     raise POSException.StorageTransactionError(
                         "tpc_finish called with wrong transaction")
@@ -737,12 +730,7 @@
                     self._ude = None
                     self._transaction = None
                     self._commit_lock_release()
-            finally:
-                self._lock_release()
 
-        finally:
-            self._files.write_unlock()
-
     def _finish(self, tid, u, d, e):
         # If self._nextpos is 0, then the transaction didn't write any
         # data, so we don't bother writing anything to the file.
@@ -1139,26 +1127,22 @@
                 return
             have_commit_lock = True
             opos, index = pack_result
-            self._files.write_lock()
-            self._lock_acquire()
-            try:
-                self._files.empty()
-                self._file.close()
-                try:
-                    os.rename(self._file_name, oldpath)
-                except Exception:
+            with self._files.write_lock():
+                with self._lock:
+                    self._files.empty()
+                    self._file.close()
+                    try:
+                        os.rename(self._file_name, oldpath)
+                    except Exception:
+                        self._file = open(self._file_name, 'r+b')
+                        raise
+
+                    # OK, we're beyond the point of no return
+                    os.rename(self._file_name + '.pack', self._file_name)
                     self._file = open(self._file_name, 'r+b')
-                    raise
+                    self._initIndex(index, self._tindex)
+                    self._pos = opos
 
-                # OK, we're beyond the point of no return
-                os.rename(self._file_name + '.pack', self._file_name)
-                self._file = open(self._file_name, 'r+b')
-                self._initIndex(index, self._tindex)
-                self._pos = opos
-            finally:
-                self._files.write_unlock()
-                self._lock_release()
-
             # We're basically done.  Now we need to deal with removed
             # blobs and removing the .old file (see further down).
 
@@ -2053,6 +2037,7 @@
 
     closed = False
     writing = False
+    writers = 0
 
     def __init__(self, file_name):
         self.name = file_name
@@ -2060,26 +2045,31 @@
         self._out = []
         self._cond = threading.Condition()
 
+    @contextlib.contextmanager
     def write_lock(self):
-        self._cond.acquire()
-        try:
-            self.writing = True
-            while self._out:
+        with self._cond:
+            self.writers += 1
+            while self.writing or self._out:
                 self._cond.wait()
+            if self.closed:
+                raise ValueError('closed')
+            self.writing = True
+
+        try:
+            yield None
         finally:
-            self._cond.release()
+            with self._cond:
+                self.writing = False
+                if self.writers > 0:
+                    self.writers -= 1
+                self._cond.notifyAll()
 
-    def write_unlock(self):
-        self._cond.acquire()
-        self.writing = False
-        self._cond.notifyAll()
-        self._cond.release()
-
+    @contextlib.contextmanager
     def get(self):
-        self._cond.acquire()
-        try:
-            while self.writing:
+        with self._cond:
+            while self.writers:
                 self._cond.wait()
+            assert not self.writing
             if self.closed:
                 raise ValueError('closed')
 
@@ -2088,32 +2078,25 @@
             except IndexError:
                 f = open(self.name, 'rb')
             self._out.append(f)
-            return f
+
+        try:
+            yield f
         finally:
-            self._cond.release()
+            self._out.remove(f)
+            self._files.append(f)
+            if not self._out:
+                with self._cond:
+                    if self.writers and not self._out:
+                        self._cond.notifyAll()
 
-    def put(self, f):
-        self._out.remove(f)
-        self._files.append(f)
-        if not self._out:
-            self._cond.acquire()
-            try:
-                if self.writing and not self._out:
-                    self._cond.notifyAll()
-            finally:
-                self._cond.release()
-
     def empty(self):
         while self._files:
             self._files.pop().close()
 
     def close(self):
-        self._cond.acquire()
-        self.closed = True
-        self._cond.release()
-
-        self.write_lock()
-        try:
+        with self._cond:
+            self.closed = True
+            while self._out:
+                self._out.pop().close()
             self.empty()
-        finally:
-            self.write_unlock()
+            self.writing = self.writers = 0



More information about the checkins mailing list