[Checkins] SVN: mongopersist/trunk/ Implemented optimistic data dumping. See CHANGES.txt for more details.

Stephen Richter cvs-admin at zope.org
Sun Mar 11 07:12:06 UTC 2012


Log message for revision 124564:
  Implemented optimistic data dumping. See CHANGES.txt for more details.
  
  

Changed:
  U   mongopersist/trunk/CHANGES.txt
  U   mongopersist/trunk/src/mongopersist/datamanager.py
  U   mongopersist/trunk/src/mongopersist/interfaces.py
  A   mongopersist/trunk/src/mongopersist/optimistic-data-dumping.txt
  U   mongopersist/trunk/src/mongopersist/serialize.py
  U   mongopersist/trunk/src/mongopersist/tests/test_datamanager.py
  U   mongopersist/trunk/src/mongopersist/tests/test_serialize.py
  U   mongopersist/trunk/src/mongopersist/zope/container.py
  U   mongopersist/trunk/src/mongopersist/zope/tests/test_container.py

-=-
Modified: mongopersist/trunk/CHANGES.txt
===================================================================
--- mongopersist/trunk/CHANGES.txt	2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/CHANGES.txt	2012-03-11 07:12:01 UTC (rev 124564)
@@ -2,11 +2,62 @@
 CHANGES
 =======
 
-0.5.6 (unreleased)
+0.6.0 (unreleased)
 ------------------
 
-- ...
+- Switched to optimisitc data dumping, which approaches transactions by
+  dumping early and as the data comes. All changes are undone when the
+  transaction fails/aborts. See ``optimistic-data-dumping.txt`` for
+  details. Here are some of the new features:
 
+  * Data manager keeps track of all original docs before their objects are
+    modified, so any change can be done.
+
+  * Added an API to data manager (``DataManager.insert(obj)``) to insert an
+    object in the database.
+
+  * Added an API to data manager (``DataManager.remove(obj)``) to remove an
+    object from the database.
+
+  * Data can be flushed to Mongo (``DataManager.flush()``) at any point of the
+    transaction retaining the ability to completely undo all changes. Flushing
+    features the following characteristics:
+
+    + During a given transaction, we guarantee that the user will always receive
+      the same Python object. This requires that flush does not reset the object
+      cache.
+
+    + The ``_p_serial`` is increased by one. (Automatically done in object
+      writer.)
+
+    + The object is removed from the registered objects and the ``_p_changed``
+      flag is set to ``False``.
+
+    + Before flushing, potential conflicts are detected.
+
+  * Implemented a flushing policy: Changes are always flushed before any query
+    is made. A simple wrapper for the ``pymongo`` collection
+    (``CollectionWrapper``) ensures that flush is called before the correct
+    method calls. The new API method ``DataManager.get_collection(obj)``
+    allows one to quickly get a wrapped collection.
+
+- The ``MongoContainer`` class now removes objects from the database upon
+  container removal is ``_m_remove_documents`` is ``True``. The default is
+  ``True``.
+
+- Removed ``fields`` argument from the ``MongoContainer.find(...)`` and
+  ``MongoContainer.find_one(...)`` methods, since it was not used.
+
+- If a container has N items, it took N+1 queries to load the list of items
+  completely. This was due to one query returning all DBRefs and then using
+  one query to load the state for each. Now, the first query loads all full
+  states and uses an extension to ``DataManager.setstate(obj, doc=None)`` to
+  load the state of the object with the previously queried data.
+
+- Changed ``MongoContainer.get_collection()`` to return a
+  ``CollectionWrapper`` instance.
+
+
 0.5.5 (2012-03-09)
 ------------------
 

Modified: mongopersist/trunk/src/mongopersist/datamanager.py
===================================================================
--- mongopersist/trunk/src/mongopersist/datamanager.py	2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/src/mongopersist/datamanager.py	2012-03-11 07:12:01 UTC (rev 124564)
@@ -36,6 +36,38 @@
 
     return adapter.process(collection, spec)
 
+class FlushDecorator(object):
+
+    def __init__(self, datamanager, function):
+        self.datamanager = datamanager
+        self.function = function
+
+    def __call__(self, *args, **kwargs):
+        self.datamanager.flush()
+        return self.function(*args, **kwargs)
+
+class CollectionWrapper(object):
+
+    QUERY_METHODS = ['group', 'map_reduce', 'inline_map_reduce', 'find_one',
+                     'find', 'count', 'find_and_modify']
+
+    def __init__(self, collection, datamanager):
+        self.__dict__['collection'] = collection
+        self.__dict__['_datamanager'] = datamanager
+
+    def __getattr__(self, name):
+        attr = getattr(self.collection, name)
+        if name in self.QUERY_METHODS:
+            attr = FlushDecorator(self._datamanager, attr)
+        return attr
+
+    def __setattr__(self, name, value):
+        setattr(self.collection, name, value)
+
+    def __delattr__(self, name):
+        delattr(self.collection, name)
+
+
 class Root(UserDict.DictMixin):
 
     database='mongopersist'
@@ -91,6 +123,9 @@
         self._writer = serialize.ObjectWriter(self)
         self._registered_objects = []
         self._loaded_objects = []
+        self._inserted_objects = []
+        self._removed_objects = []
+        self._original_states = {}
         self._needs_to_join = True
         self._object_cache = {}
         self.annotations = {}
@@ -105,6 +140,44 @@
         self.transaction_manager = transaction.manager
         self.root = Root(self, root_database, root_collection)
 
+    def _get_collection(self, obj):
+        db_name, coll_name = self._writer.get_collection_name(obj)
+        return self._conn[db_name][coll_name]
+
+    def _check_conflicts(self):
+        if not self.detect_conflicts:
+            return
+        # Check each modified object to see whether Mongo has a new version of
+        # the object.
+        for obj in self._registered_objects:
+            # This object is not even added to the database yet, so there
+            # cannot be a conflict.
+            if obj._p_oid is None:
+                continue
+            coll = self._get_collection(obj)
+            new_doc = coll.find_one(obj._p_oid.id, fields=('_py_serial',))
+            if new_doc is None:
+                continue
+            if new_doc.get('_py_serial', 0) != serialize.u64(obj._p_serial):
+                raise self.conflict_error_factory(obj, new_doc)
+
+    def _flush_objects(self):
+        # Now write every registered object, but make sure we write each
+        # object just once.
+        written = []
+        for obj in self._registered_objects:
+            if getattr(obj, '_p_mongo_sub_object', False):
+                # Make sure we write the object representing a document in a
+                # collection and not a sub-object.
+                obj = obj._p_mongo_doc_object
+            if obj in written:
+                continue
+            self._writer.store(obj)
+            written.append(obj)
+
+    def get_collection(self, obj):
+        return CollectionWrapper(self._get_collection(obj), self)
+
     def dump(self, obj):
         return self._writer.store(obj)
 
@@ -116,14 +189,56 @@
         self.__init__(self._conn)
         self.root = root
 
-    def setstate(self, obj):
+    def flush(self):
+        # Check for conflicts.
+        self._check_conflicts()
+        # Now write every registered object, but make sure we write each
+        # object just once.
+        self._flush_objects()
+        # Let's now reset all objects as if they were not modified:
+        for obj in self._registered_objects:
+            obj._p_changed = False
+        self._registered_objects = []
+
+    def insert(self, obj):
+        if obj._p_oid is not None:
+            raise ValueError('Object has already an OID.', obj)
+        res = self._writer.store(obj)
+        obj._p_changed = False
+        self._object_cache[obj._p_oid] = obj
+        self._inserted_objects.append(obj)
+        return res
+
+    def remove(self, obj):
+        if obj._p_oid is None:
+            raise ValueError('Object does not have OID.', obj)
+        # Edge case: The object was just added in this transaction.
+        if obj in self._inserted_objects:
+            self._inserted_objects.remove(obj)
+            return
+        # If the object is still in the ghost state, let's load it, so that we
+        # have the state in case we abort the transaction later.
+        if obj._p_changed is None:
+            self.setstate(obj)
+        # Now we remove the object from Mongo.
+        coll = self._get_collection(obj)
+        coll.remove({'_id': obj._p_oid.id})
+        self._removed_objects.append(obj)
+        # Just in case the object was modified before removal, let's remove it
+        # from the modification list:
+        if obj._p_changed:
+            self._registered_objects.remove(obj)
+        # We are not doing anything fancy here, since the object might be
+        # added again with some different state.
+
+    def setstate(self, obj, doc=None):
         # When reading a state from Mongo, we also need to join the
         # transaction, because we keep an active object cache that gets stale
         # after the transaction is complete and must be cleaned.
         if self._needs_to_join:
             self.transaction_manager.get().join(self)
             self._needs_to_join = False
-        self._reader.set_ghost_state(obj)
+        self._reader.set_ghost_state(obj, doc)
         self._loaded_objects.append(obj)
 
     def oldstate(self, obj, tid):
@@ -140,25 +255,24 @@
             self._registered_objects.append(obj)
 
     def abort(self, transaction):
+        # Aborting the transaction requires three steps:
+        # 1. Remove any inserted objects.
+        for obj in self._inserted_objects:
+            coll = self._get_collection(obj)
+            coll.remove({'_id': obj._p_oid.id})
+        # 2. Re-insert any removed objects.
+        for obj in self._removed_objects:
+            coll = self._get_collection(obj)
+            coll.insert(self._original_states[obj._p_oid])
+            del self._original_states[obj._p_oid]
+        # 3. Reset any changed states.
+        for db_ref, state in self._original_states.items():
+            coll = self._conn[db_ref.database][db_ref.collection]
+            coll.update({'_id': db_ref.id}, state, True)
         self.reset()
 
     def commit(self, transaction):
-        if not self.detect_conflicts:
-            return
-        # Check each modified object to see whether Mongo has a new version of
-        # the object.
-        for obj in self._registered_objects:
-            # This object is not even added to the database yet, so there
-            # cannot be a conflict.
-            if obj._p_oid is None:
-                continue
-            db_name, coll_name = self._writer.get_collection_name(obj)
-            coll = self._conn[db_name][coll_name]
-            new_doc = coll.find_one(obj._p_oid.id, fields=('_py_serial',))
-            if new_doc is None:
-                continue
-            if new_doc.get('_py_serial', 0) != serialize.u64(obj._p_serial):
-                raise self.conflict_error_factory(obj, new_doc)
+        self._check_conflicts()
 
     def tpc_begin(self, transaction):
         pass
@@ -167,14 +281,7 @@
         pass
 
     def tpc_finish(self, transaction):
-        written = []
-        for obj in self._registered_objects:
-            if getattr(obj, '_p_mongo_sub_object', False):
-                obj = obj._p_mongo_doc_object
-            if obj in written:
-                continue
-            self._writer.store(obj)
-            written.append(obj)
+        self._flush_objects()
         self.reset()
 
     def tpc_abort(self, transaction):

Modified: mongopersist/trunk/src/mongopersist/interfaces.py
===================================================================
--- mongopersist/trunk/src/mongopersist/interfaces.py	2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/src/mongopersist/interfaces.py	2012-03-11 07:12:01 UTC (rev 124564)
@@ -126,6 +126,12 @@
     root = zope.interface.Attribute(
         """Get the root object, which is a mapping.""")
 
+    detect_conflicts = zope.interface.Attribute(
+        """A flag, when set it enables write conflict detection.""")
+
+    def get_collection(obj):
+        """Return the collection for an object."""
+
     def reset():
         """Reset the datamanager for the next transaction."""
 
@@ -138,7 +144,22 @@
         Note: The returned object is in the ghost state.
         """
 
+    def flush():
+        """Flush all changes to Mongo."""
 
+    def insert(obj):
+        """Insert an object into Mongo.
+
+        The correct collection is determined by object type.
+        """
+
+    def remove(obj):
+        """Remove an object from Mongo.
+
+        The correct collection is determined by object type.
+        """
+
+
 class IMongoConnectionPool(zope.interface.Interface):
     """MongoDB connection pool"""
 
@@ -172,4 +193,4 @@
     """An adapter to process find/update spec's"""
 
     def process(collection, spec):
-        """return the processed spec here"""
\ No newline at end of file
+        """return the processed spec here"""

Added: mongopersist/trunk/src/mongopersist/optimistic-data-dumping.txt
===================================================================
--- mongopersist/trunk/src/mongopersist/optimistic-data-dumping.txt	                        (rev 0)
+++ mongopersist/trunk/src/mongopersist/optimistic-data-dumping.txt	2012-03-11 07:12:01 UTC (rev 124564)
@@ -0,0 +1,37 @@
+=======================
+Optimistic Data Dumping
+=======================
+
+``mongopersist`` tried very hard to delay any state storage as long as
+possible, specifically the end of the transaction. And in some sense, that's
+nice, because it avoided multiple writes when multiple attributes are updated
+and minimized the times of inconsistent state. However, it also meant that we
+often had to commit transactions prematurely after modifications in order to
+make multi-object-result queries work. I call this approach pessimistic data
+dumping (PDD).
+
+Now ``mongopersist`` embraces change and dumps data frequently whenever it
+makes logically sense, for example before any multi-object-result query to the
+database. And since it keeps track of the original state, you can revert all
+changes when the transaction is aborted for some reason. A consequence of this
+approach is that the database might temporarily be in an inconsistent state or
+shows data that might be removed again. This is optimistic data dumping (ODD).
+
+The problem of PDD is that it is designed for the rare case that something
+goes wrong late in a transaction and that all changes have to be reverted,
+while ODD assumes success and fixes the situation if something went wrong. It
+is like the old saying: Asking for forgiveness is easier -- and in this case
+computationally cheaper and less complex -- than asking for permission.
+
+Also, while PDD seems originally closer to real transaction safety, it is
+not. Due to the lack of frequent dumping, a single logical transaction gets
+split into many small ones without the ability to properly retrieve the
+original state if soemthing goes wrong. So at the end there is no transaction
+safety or consistency guarantee.
+
+
+With ODD, we can ensure at least partial consistency by flushing after logical
+update units. The implementing flushing policy supports this, since one would
+not query for multi-object result sets if there would not be some notion of
+temporary consistency. And at the end, the greatest benefit is the ability to
+completely undo all changes of the transaction.


Property changes on: mongopersist/trunk/src/mongopersist/optimistic-data-dumping.txt
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: mongopersist/trunk/src/mongopersist/serialize.py
===================================================================
--- mongopersist/trunk/src/mongopersist/serialize.py	2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/src/mongopersist/serialize.py	2012-03-11 07:12:01 UTC (rev 124564)
@@ -372,19 +372,23 @@
             return sub_obj
         return state
 
-    def set_ghost_state(self, obj):
+    def set_ghost_state(self, obj, doc=None):
         # Look up the object state by coll_name and oid.
-        coll = self._jar._conn[obj._p_oid.database][obj._p_oid.collection]
-        doc = coll.find_one({'_id': obj._p_oid.id})
-        doc.pop('_id')
-        doc.pop('_py_persistent_type', None)
+        if doc is None:
+            coll = self._jar._conn[obj._p_oid.database][obj._p_oid.collection]
+            doc = coll.find_one({'_id': obj._p_oid.id})
+            doc.pop('_id')
+            doc.pop('_py_persistent_type', None)
         # Store the serial, if conflict detection is enabled.
         if self._jar.detect_conflicts:
             obj._p_serial = p64(doc.pop('_py_serial', 0))
         # Now convert the document to a proper Python state dict.
-        state = self.get_object(doc, obj)
+        state = dict(self.get_object(doc, obj))
+        # Now store the original state. It is assumed that the state dict is
+        # not modified later.
+        self._jar._original_states[obj._p_oid] = doc
         # Set the state.
-        obj.__setstate__(dict(state))
+        obj.__setstate__(state)
 
     def get_ghost(self, dbref):
         # If we can, we return the object from cache.

Modified: mongopersist/trunk/src/mongopersist/tests/test_datamanager.py
===================================================================
--- mongopersist/trunk/src/mongopersist/tests/test_datamanager.py	2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/src/mongopersist/tests/test_datamanager.py	2012-03-11 07:12:01 UTC (rev 124564)
@@ -80,6 +80,56 @@
       []
     """
 
+def doctest_MongoDataManager_get_collection():
+    r"""MongoDataManager: get_collection(obj)
+
+    Get the collection for an object.
+
+      >>> foo = Foo('1')
+      >>> foo_ref = dm.insert(foo)
+      >>> dm.reset()
+
+      >>> coll = dm.get_collection(foo)
+
+    We are returning a collection wrapper instead, so that we can flush the
+    data before any method involving a query.
+
+      >>> coll
+      <mongopersist.datamanager.CollectionWrapper object at 0x19e47d0>
+
+    Let's make sure that modifying attributes is done on the original
+    collection:
+
+      >>> coll.foo = 1
+      >>> coll.collection.foo
+      1
+      >>> coll.foo
+      1
+      >>> del coll.foo
+
+    Let's now try the real functionality behind the wrapper. So we are in a
+    transaction and modify an object:
+
+      >>> foo_new = dm.load(foo_ref)
+      >>> foo_new.name = '2'
+
+    If we do not use the wrapper, the change is not visible:
+
+      >>> tuple(dm._get_collection(foo_new).find())
+      ({u'_id': ObjectId('4f5c1bf537a08e2ea6000000'), u'name': u'1'},)
+
+    But if we use the wrapper, the change gets flushed first:
+
+      >>> tuple(dm.get_collection(foo_new).find())
+      ({u'_id': ObjectId('4f5c1bf537a08e2ea6000000'), u'name': u'2'},)
+
+    Of course, aborting the transaction gets us back to the original state:
+
+      >>> dm.abort(transaction.get())
+      >>> tuple(dm._get_collection(foo_new).find())
+      ({u'_id': ObjectId('4f5c1bf537a08e2ea6000000'), u'name': u'1'},)
+    """
+
 def doctest_MongoDataManager_object_dump_load_reset():
     r"""MongoDataManager: dump(), load(), reset()
 
@@ -120,9 +170,180 @@
       >>> foo._p_oid = foo2._p_oid
     """
 
-def doctest_MongoDataManager_set_state():
-    r"""MongoDataManager: set_state()
+def doctest_MongoDataManager_flush():
+    r"""MongoDataManager: flush()
 
+    This method writes all registered objects to Mongo. It can be used at any
+    time during the transaction when a dump is necessary, but is also used at
+    the end of the transaction to dump all remaining objects.
+
+    We also want to test the effects of conflict detection:
+
+      >>> dm.detect_conflicts = True
+
+    Let's now add an object to the database and reset the manager like it is
+    done at the end of a transaction:
+
+      >>> foo = Foo('foo')
+      >>> foo_ref = dm.dump(foo)
+      >>> dm.reset()
+
+    Let's now load the object again and make a modification:
+
+      >>> foo_new = dm.load(foo._p_oid)
+      >>> foo_new.name = 'Foo'
+
+    The object is now registered with the data manager:
+
+      >>> dm._registered_objects
+      [<mongopersist.tests.test_datamanager.Foo object at 0x2f7b9b0>]
+      >>> foo_new._p_serial
+      '\x00\x00\x00\x00\x00\x00\x00\x01'
+
+    Let's now flush the registered objects:
+
+      >>> dm.flush()
+
+    There are several side effects that should be observed:
+
+    * During a given transaction, we guarantee that the user will always receive
+      the same Python object. This requires that flush does not reset the object
+      cache.
+
+        >>> id(dm.load(foo._p_oid)) == id(foo_new)
+        True
+
+    * The ``_p_serial`` is increased by one.
+
+        >>> foo_new._p_serial
+        '\x00\x00\x00\x00\x00\x00\x00\x02'
+
+    * The object is removed from the registered objects and the ``_p_changed``
+      flag is set to ``False``.
+
+        >>> dm._registered_objects
+        []
+        >>> foo_new._p_changed
+        False
+
+    * Before flushing, potential conflicts must be detected as it is done before
+      committing a transaction.
+
+        >>> foo_new._p_serial = '\x00\x00\x00\x00\x00\x00\x00\x01'
+        >>> foo_new.name = 'Foo'
+        >>> dm.flush()
+        Traceback (most recent call last):
+        ...
+        ConflictError: database conflict error
+            (oid DBRef('mongopersist.tests.test_datamanager.Foo',
+                       ObjectId('4f5bfcaf37a08e2849000000'),
+                       'mongopersist_test'),
+             class Foo, start serial 1, current serial 2)
+    """
+
+def doctest_MongoDataManager_insert():
+    r"""MongoDataManager: insert(obj)
+
+    This method inserts an object into the database.
+
+      >>> foo = Foo('foo')
+      >>> foo_ref = dm.insert(foo)
+
+    After insertion, the original is not changed:
+
+      >>> foo._p_changed
+      False
+
+    It is also added to the list of inserted objects:
+
+      >>> dm._inserted_objects
+      [<mongopersist.tests.test_datamanager.Foo object at 0x18d41b8>]
+
+    Let's make sure it is really in Mongo:
+
+      >>> dm.reset()
+      >>> foo_new = dm.load(foo_ref)
+      >>> foo_new
+      <mongopersist.tests.test_datamanager.Foo object at 0x27cade8>
+
+    Notice, that we cannot insert the object again:
+
+      >>> dm.insert(foo_new)
+      Traceback (most recent call last):
+      ...
+      ValueError: ('Object has already an OID.',
+                   <mongopersist.tests.test_datamanager.Foo object at 0x1fecde8>)
+
+    Finally, registering a new object will not trigger an insert, but only
+    schedule the object for writing. This is done, since sometimes objects are
+    registered when we only want to store a stub since we otherwise end up in
+    endless recursion loops.
+
+      >>> foo2 = Foo('Foo 2')
+      >>> dm.register(foo2)
+
+      >>> dm._registered_objects
+      [<mongopersist.tests.test_datamanager.Foo object at 0x3087b18>]
+
+    But storing works as expected (flush is implicit before find):
+
+      >>> tuple(dm.get_collection(foo2).find())
+      ({u'_id': ObjectId('4f5c443837a08e37bf000000'), u'name': u'foo'},
+       {u'_id': ObjectId('4f5c443837a08e37bf000001'), u'name': u'Foo 2'})
+    """
+
+def doctest_MongoDataManager_remove():
+    r"""MongoDataManager: remove(obj)
+
+    This method removes an object from the database.
+
+      >>> foo = Foo('foo')
+      >>> foo_ref = dm.insert(foo)
+      >>> dm.reset()
+
+    Let's now load the object and remove it.
+
+      >>> foo_new = dm.load(foo_ref)
+      >>> dm.remove(foo_new)
+
+    The object is removed from the collection immediately:
+
+      >>> tuple(dm._get_collection(foo_ref).find())
+      ()
+
+    Also, the object is added to the list of removed objects:
+
+      >>> dm._removed_objects
+      [<mongopersist.tests.test_datamanager.Foo object at 0x1693140>]
+
+    Note that you cannot remove objects that are not in the database:
+
+      >>> dm.remove(Foo('Foo 2'))
+      Traceback (most recent call last):
+      ValueError: ('Object does not have OID.',
+                   <mongopersist.tests.test_datamanager.Foo object at 0x1982ed8>)
+
+    There is an edge case, if the object is inserted and removed in the same
+    transaction:
+
+      >>> dm.reset()
+      >>> foo3 = Foo('Foo 3')
+      >>> foo3_ref = dm.insert(foo3)
+      >>> dm.remove(foo3)
+
+    In this case, the object removed from Mongo and from the inserted object
+    list and never added to the removed object list.
+
+      >>> dm._inserted_objects
+      []
+      >>> dm._removed_objects
+      []
+
+    """
+
+def doctest_MongoDataManager_setstate():
+    r"""MongoDataManager: setstate()
+
     This method loads and sets the state of an object and joins the
     transaction.
 
@@ -195,6 +416,49 @@
       True
       >>> len(dm._registered_objects)
       0
+
+    Let's now create a more interesting case with a transaction that inserted,
+    removed and changed objects.
+
+    First let's create an initial state:
+
+      >>> dm.reset()
+      >>> foo_ref = dm.insert(Foo('one'))
+      >>> foo2_ref = dm.insert(Foo('two'))
+      >>> dm.reset()
+
+      >>> coll = dm._get_collection(Foo())
+      >>> tuple(coll.find({}))
+      ({u'_id': ObjectId('4f5c114f37a08e2cac000000'), u'name': u'one'},
+       {u'_id': ObjectId('4f5c114f37a08e2cac000001'), u'name': u'two'})
+
+    Now, in a second transaction we modify the state of objects in all three
+    ways:
+
+      >>> foo = dm.load(foo_ref)
+      >>> foo.name = '1'
+      >>> dm._registered_objects
+      [<mongopersist.tests.test_datamanager.Foo object at 0x187b1b8>]
+
+      >>> foo2 = dm.load(foo2_ref)
+      >>> dm.remove(foo2)
+      >>> dm._removed_objects
+      [<mongopersist.tests.test_datamanager.Foo object at 0x1e5c140>]
+
+      >>> foo3_ref = dm.insert(Foo('three'))
+
+      >>> dm.flush()
+      >>> tuple(coll.find({}))
+      ({u'_id': ObjectId('4f5c114f37a08e2cac000000'), u'name': u'1'},
+       {u'_id': ObjectId('4f5c114f37a08e2cac000002'), u'name': u'three'})
+
+    Let's now abort the transaction and everything should be back to what it
+    was before:
+
+      >>> dm.abort(transaction.get())
+      >>> tuple(coll.find({}))
+      ({u'_id': ObjectId('4f5c114f37a08e2cac000000'), u'name': u'one'},
+       {u'_id': ObjectId('4f5c114f37a08e2cac000001'), u'name': u'two'})
     """
 
 def doctest_MongoDataManager_commit():

Modified: mongopersist/trunk/src/mongopersist/tests/test_serialize.py
===================================================================
--- mongopersist/trunk/src/mongopersist/tests/test_serialize.py	2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/src/mongopersist/tests/test_serialize.py	2012-03-11 07:12:01 UTC (rev 124564)
@@ -664,6 +664,19 @@
       u'top'
       >>> gobj._p_serial
       '\x00\x00\x00\x00\x00\x00\x00\x01'
+
+    Note that the original state is stored in the data manager:
+
+      >>> gobj._p_jar._original_states
+      {DBRef('Top', ObjectId('4f5bf4e437a08e2614000001'), 'mongopersist_test'):
+         {u'name': u'top'}}
+
+    This state does not change, even when the object is modified:
+
+      >>> gobj.name = 'stop'
+      >>> gobj._p_jar._original_states[gobj._p_oid] != gobj.__getstate__()
+      True
+
     """
 
 

Modified: mongopersist/trunk/src/mongopersist/zope/container.py
===================================================================
--- mongopersist/trunk/src/mongopersist/zope/container.py	2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/src/mongopersist/zope/container.py	2012-03-11 07:12:01 UTC (rev 124564)
@@ -22,7 +22,7 @@
 
 from mongopersist import interfaces, serialize
 from mongopersist.zope import interfaces as zinterfaces
-from mongopersist.datamanager import processSpec
+from mongopersist.datamanager import processSpec, CollectionWrapper
 
 class MongoContained(contained.Contained):
 
@@ -42,6 +42,7 @@
 
 
 class SimpleMongoContainer(sample.SampleContainer, persistent.Persistent):
+    _m_remove_documents = True
 
     def __getstate__(self):
         state = super(SimpleMongoContainer, self).__getstate__()
@@ -81,12 +82,15 @@
     def values(self):
         return [v for k, v in self.items()]
 
-    def __setitem__(self, key, object):
-        super(SimpleMongoContainer, self).__setitem__(key, object)
+    def __setitem__(self, key, obj):
+        super(SimpleMongoContainer, self).__setitem__(key, obj)
         self._p_changed = True
 
     def __delitem__(self, key):
+        obj = self[key]
         super(SimpleMongoContainer, self).__delitem__(key)
+        if self._m_remove_documents:
+            self._p_jar.remove(obj)
         self._p_changed = True
 
 
@@ -98,6 +102,7 @@
     _m_collection = None
     _m_mapping_key = 'key'
     _m_parent_key = 'parent'
+    _m_remove_documents = True
 
     def __init__(self, collection=None, database=None,
                  mapping_key=None, parent_key=None):
@@ -111,16 +116,6 @@
             self._m_parent_key = parent_key
 
     @property
-    def _added(self):
-        ann = self._m_jar.annotations.setdefault(self._p_oid or id(self), {})
-        return ann.setdefault('added', {})
-
-    @property
-    def _deleted(self):
-        ann = self._m_jar.annotations.setdefault(self._p_oid or id(self), {})
-        return ann.setdefault('deleted', {})
-
-    @property
     def _m_jar(self):
         # If the container is in a Mongo storage hierarchy, then getting the
         # datamanager is easy, otherwise we do an adapter lookup.
@@ -133,7 +128,8 @@
 
     def get_collection(self):
         db_name = self._m_database or self._m_jar.default_database
-        return self._m_jar._conn[db_name][self._m_collection]
+        return CollectionWrapper(
+            self._m_jar._conn[db_name][self._m_collection], self._m_jar)
 
     def _m_get_parent_key_value(self):
         if getattr(self, '_p_jar', None) is None:
@@ -155,62 +151,56 @@
             filter[self._m_parent_key] = gs(self._m_get_parent_key_value())
         return filter
 
-    def __getitem__(self, key):
-        if key in self._added:
-            return self._added[key]
-        if key in self._deleted:
-            raise KeyError(key)
-        filter = self._m_get_items_filter()
-        filter[self._m_mapping_key] = key
-        coll = self.get_collection()
-        doc = coll.find_one(processSpec(coll, filter), fields=())
-        if doc is None:
-            raise KeyError(key)
+    def _load_one(self, doc):
+        # Create a DBRef object and then load the full state of the object.
         dbref = pymongo.dbref.DBRef(
             self._m_collection, doc['_id'],
             self._m_database or self._m_jar.default_database)
         obj = self._m_jar._reader.get_ghost(dbref)
-        obj._v_key = key
+        self._m_jar.setstate(obj, doc)
+        obj._v_key = doc[self._m_mapping_key]
         obj._v_parent = self
         return obj
 
+    def __getitem__(self, key):
+        filter = self._m_get_items_filter()
+        filter[self._m_mapping_key] = key
+        coll = self.get_collection()
+        doc = coll.find_one(processSpec(coll, filter))
+        if doc is None:
+            raise KeyError(key)
+        return self._load_one(doc)
 
     def _real_setitem(self, key, value):
         # This call by iteself caues the state to change _p_changed to True.
         setattr(value, self._m_mapping_key, key)
         if self._m_parent_key is not None:
             setattr(value, self._m_parent_key, self._m_get_parent_key_value())
-        self._m_jar.register(value)
-        # Temporarily store the added object, so it is immediately available
-        # via the API.
-        self._added[key] = value
-        self._deleted.pop(key, None)
+        self._m_jar.insert(value)
 
     def __setitem__(self, key, value):
         contained.setitem(self, self._real_setitem, key, value)
 
     def __delitem__(self, key):
-        # Deleting the object from the database is not our job. We simply
-        # remove it from the dictionary.
         value = self[key]
+        # First remove the parent and name from the object.
         if self._m_mapping_key is not None:
             delattr(value, self._m_mapping_key)
         if self._m_parent_key is not None:
             delattr(value, self._m_parent_key)
-        self._deleted[key] = value
-        self._added.pop(key, None)
+        # Let's now remove the object from the database.
+        if self._m_remove_documents:
+            self._m_jar.remove(value)
+        # Send the uncontained event.
         contained.uncontained(value, self, key)
 
     def keys(self):
         filter = self._m_get_items_filter()
         filter[self._m_mapping_key] = {'$ne': None}
         coll = self.get_collection()
-        keys = [
-            doc[self._m_mapping_key]
-            for doc in coll.find(processSpec(coll, filter))
-            if not doc[self._m_mapping_key] in self._deleted]
-        keys += self._added.keys()
-        return keys
+        return [doc[self._m_mapping_key]
+                for doc in coll.find(processSpec(coll, filter),
+                                     fields=(self._m_mapping_key,))]
 
     def raw_find(self, spec=None, *args, **kwargs):
         if spec is None:
@@ -219,18 +209,11 @@
         coll = self.get_collection()
         return coll.find(processSpec(coll, spec), *args, **kwargs)
 
-    def find(self, spec=None, fields=None, *args, **kwargs):
-        # If fields were not specified, we only request the oid and the key.
-        fields = tuple(fields or ())
-        fields += (self._m_mapping_key,)
-        result = self.raw_find(spec, fields, *args, **kwargs)
+    def find(self, spec=None, *args, **kwargs):
+        # Search for matching objects.
+        result = self.raw_find(spec, *args, **kwargs)
         for doc in result:
-            dbref = pymongo.dbref.DBRef(
-                self._m_collection, doc['_id'],
-                self._m_database or self._m_jar.default_database)
-            obj = self._m_jar._reader.get_ghost(dbref)
-            obj._v_key = doc[self._m_mapping_key]
-            obj._v_parent = self
+            obj = self._load_one(doc)
             yield obj
 
     def raw_find_one(self, spec_or_id=None, *args, **kwargs):
@@ -242,20 +225,11 @@
         coll = self.get_collection()
         return coll.find_one(processSpec(coll, spec_or_id), *args, **kwargs)
 
-    def find_one(self, spec_or_id=None, fields=None, *args, **kwargs):
-        # If fields were not specified, we only request the oid and the key.
-        fields = tuple(fields or ())
-        fields += (self._m_mapping_key,)
-        doc = self.raw_find_one(spec_or_id, fields, *args, **kwargs)
+    def find_one(self, spec_or_id=None, *args, **kwargs):
+        doc = self.raw_find_one(spec_or_id, *args, **kwargs)
         if doc is None:
             return None
-        dbref = pymongo.dbref.DBRef(
-            self._m_collection, doc['_id'],
-            self._m_database or self._m_jar.default_database)
-        obj = self._m_jar._reader.get_ghost(dbref)
-        obj._v_key = doc[self._m_mapping_key]
-        obj._v_parent = self
-        return obj
+        return self._load_one(doc)
 
 class AllItemsMongoContainer(MongoContainer):
     _m_parent_key = None

Modified: mongopersist/trunk/src/mongopersist/zope/tests/test_container.py
===================================================================
--- mongopersist/trunk/src/mongopersist/zope/tests/test_container.py	2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/src/mongopersist/zope/tests/test_container.py	2012-03-11 07:12:01 UTC (rev 124564)
@@ -138,6 +138,11 @@
       >>> transaction.commit()
       >>> dm.root['c'].keys()
       []
+
+    The object is also removed from Mongo:
+
+      >>> pprint(list(db['person'].find()))
+      []
     """
 
 
@@ -192,7 +197,7 @@
       >>> dm.root['c']['stephan'].__parent__
       <mongopersist.zope.container.MongoContainer object at 0x7fec50f86500>
       >>> dm.root['c']['stephan'].__name__
-      'stephan'
+      u'stephan'
 
     We get a usual key error, if an object does not exist:
 
@@ -536,7 +541,7 @@
 
       >>> stephan = root['app']['people']['stephan']
       >>> stephan.__name__
-      'stephan'
+      u'stephan'
       >>> stephan.__parent__
       <mongopersist.zope.container.MongoContainer object at 0x7f6b6273b7d0>
 



More information about the checkins mailing list