[Checkins] SVN: mongopersist/trunk/ Implemented optimistic data dumping. See CHANGES.txt for more details.
Stephen Richter
cvs-admin at zope.org
Sun Mar 11 07:12:06 UTC 2012
Log message for revision 124564:
Implemented optimistic data dumping. See CHANGES.txt for more details.
Changed:
U mongopersist/trunk/CHANGES.txt
U mongopersist/trunk/src/mongopersist/datamanager.py
U mongopersist/trunk/src/mongopersist/interfaces.py
A mongopersist/trunk/src/mongopersist/optimistic-data-dumping.txt
U mongopersist/trunk/src/mongopersist/serialize.py
U mongopersist/trunk/src/mongopersist/tests/test_datamanager.py
U mongopersist/trunk/src/mongopersist/tests/test_serialize.py
U mongopersist/trunk/src/mongopersist/zope/container.py
U mongopersist/trunk/src/mongopersist/zope/tests/test_container.py
-=-
Modified: mongopersist/trunk/CHANGES.txt
===================================================================
--- mongopersist/trunk/CHANGES.txt 2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/CHANGES.txt 2012-03-11 07:12:01 UTC (rev 124564)
@@ -2,11 +2,62 @@
CHANGES
=======
-0.5.6 (unreleased)
+0.6.0 (unreleased)
------------------
-- ...
+- Switched to optimisitc data dumping, which approaches transactions by
+ dumping early and as the data comes. All changes are undone when the
+ transaction fails/aborts. See ``optimistic-data-dumping.txt`` for
+ details. Here are some of the new features:
+ * Data manager keeps track of all original docs before their objects are
+ modified, so any change can be done.
+
+ * Added an API to data manager (``DataManager.insert(obj)``) to insert an
+ object in the database.
+
+ * Added an API to data manager (``DataManager.remove(obj)``) to remove an
+ object from the database.
+
+ * Data can be flushed to Mongo (``DataManager.flush()``) at any point of the
+ transaction retaining the ability to completely undo all changes. Flushing
+ features the following characteristics:
+
+ + During a given transaction, we guarantee that the user will always receive
+ the same Python object. This requires that flush does not reset the object
+ cache.
+
+ + The ``_p_serial`` is increased by one. (Automatically done in object
+ writer.)
+
+ + The object is removed from the registered objects and the ``_p_changed``
+ flag is set to ``False``.
+
+ + Before flushing, potential conflicts are detected.
+
+ * Implemented a flushing policy: Changes are always flushed before any query
+ is made. A simple wrapper for the ``pymongo`` collection
+ (``CollectionWrapper``) ensures that flush is called before the correct
+ method calls. The new API method ``DataManager.get_collection(obj)``
+ allows one to quickly get a wrapped collection.
+
+- The ``MongoContainer`` class now removes objects from the database upon
+ container removal is ``_m_remove_documents`` is ``True``. The default is
+ ``True``.
+
+- Removed ``fields`` argument from the ``MongoContainer.find(...)`` and
+ ``MongoContainer.find_one(...)`` methods, since it was not used.
+
+- If a container has N items, it took N+1 queries to load the list of items
+ completely. This was due to one query returning all DBRefs and then using
+ one query to load the state for each. Now, the first query loads all full
+ states and uses an extension to ``DataManager.setstate(obj, doc=None)`` to
+ load the state of the object with the previously queried data.
+
+- Changed ``MongoContainer.get_collection()`` to return a
+ ``CollectionWrapper`` instance.
+
+
0.5.5 (2012-03-09)
------------------
Modified: mongopersist/trunk/src/mongopersist/datamanager.py
===================================================================
--- mongopersist/trunk/src/mongopersist/datamanager.py 2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/src/mongopersist/datamanager.py 2012-03-11 07:12:01 UTC (rev 124564)
@@ -36,6 +36,38 @@
return adapter.process(collection, spec)
+class FlushDecorator(object):
+
+ def __init__(self, datamanager, function):
+ self.datamanager = datamanager
+ self.function = function
+
+ def __call__(self, *args, **kwargs):
+ self.datamanager.flush()
+ return self.function(*args, **kwargs)
+
+class CollectionWrapper(object):
+
+ QUERY_METHODS = ['group', 'map_reduce', 'inline_map_reduce', 'find_one',
+ 'find', 'count', 'find_and_modify']
+
+ def __init__(self, collection, datamanager):
+ self.__dict__['collection'] = collection
+ self.__dict__['_datamanager'] = datamanager
+
+ def __getattr__(self, name):
+ attr = getattr(self.collection, name)
+ if name in self.QUERY_METHODS:
+ attr = FlushDecorator(self._datamanager, attr)
+ return attr
+
+ def __setattr__(self, name, value):
+ setattr(self.collection, name, value)
+
+ def __delattr__(self, name):
+ delattr(self.collection, name)
+
+
class Root(UserDict.DictMixin):
database='mongopersist'
@@ -91,6 +123,9 @@
self._writer = serialize.ObjectWriter(self)
self._registered_objects = []
self._loaded_objects = []
+ self._inserted_objects = []
+ self._removed_objects = []
+ self._original_states = {}
self._needs_to_join = True
self._object_cache = {}
self.annotations = {}
@@ -105,6 +140,44 @@
self.transaction_manager = transaction.manager
self.root = Root(self, root_database, root_collection)
+ def _get_collection(self, obj):
+ db_name, coll_name = self._writer.get_collection_name(obj)
+ return self._conn[db_name][coll_name]
+
+ def _check_conflicts(self):
+ if not self.detect_conflicts:
+ return
+ # Check each modified object to see whether Mongo has a new version of
+ # the object.
+ for obj in self._registered_objects:
+ # This object is not even added to the database yet, so there
+ # cannot be a conflict.
+ if obj._p_oid is None:
+ continue
+ coll = self._get_collection(obj)
+ new_doc = coll.find_one(obj._p_oid.id, fields=('_py_serial',))
+ if new_doc is None:
+ continue
+ if new_doc.get('_py_serial', 0) != serialize.u64(obj._p_serial):
+ raise self.conflict_error_factory(obj, new_doc)
+
+ def _flush_objects(self):
+ # Now write every registered object, but make sure we write each
+ # object just once.
+ written = []
+ for obj in self._registered_objects:
+ if getattr(obj, '_p_mongo_sub_object', False):
+ # Make sure we write the object representing a document in a
+ # collection and not a sub-object.
+ obj = obj._p_mongo_doc_object
+ if obj in written:
+ continue
+ self._writer.store(obj)
+ written.append(obj)
+
+ def get_collection(self, obj):
+ return CollectionWrapper(self._get_collection(obj), self)
+
def dump(self, obj):
return self._writer.store(obj)
@@ -116,14 +189,56 @@
self.__init__(self._conn)
self.root = root
- def setstate(self, obj):
+ def flush(self):
+ # Check for conflicts.
+ self._check_conflicts()
+ # Now write every registered object, but make sure we write each
+ # object just once.
+ self._flush_objects()
+ # Let's now reset all objects as if they were not modified:
+ for obj in self._registered_objects:
+ obj._p_changed = False
+ self._registered_objects = []
+
+ def insert(self, obj):
+ if obj._p_oid is not None:
+ raise ValueError('Object has already an OID.', obj)
+ res = self._writer.store(obj)
+ obj._p_changed = False
+ self._object_cache[obj._p_oid] = obj
+ self._inserted_objects.append(obj)
+ return res
+
+ def remove(self, obj):
+ if obj._p_oid is None:
+ raise ValueError('Object does not have OID.', obj)
+ # Edge case: The object was just added in this transaction.
+ if obj in self._inserted_objects:
+ self._inserted_objects.remove(obj)
+ return
+ # If the object is still in the ghost state, let's load it, so that we
+ # have the state in case we abort the transaction later.
+ if obj._p_changed is None:
+ self.setstate(obj)
+ # Now we remove the object from Mongo.
+ coll = self._get_collection(obj)
+ coll.remove({'_id': obj._p_oid.id})
+ self._removed_objects.append(obj)
+ # Just in case the object was modified before removal, let's remove it
+ # from the modification list:
+ if obj._p_changed:
+ self._registered_objects.remove(obj)
+ # We are not doing anything fancy here, since the object might be
+ # added again with some different state.
+
+ def setstate(self, obj, doc=None):
# When reading a state from Mongo, we also need to join the
# transaction, because we keep an active object cache that gets stale
# after the transaction is complete and must be cleaned.
if self._needs_to_join:
self.transaction_manager.get().join(self)
self._needs_to_join = False
- self._reader.set_ghost_state(obj)
+ self._reader.set_ghost_state(obj, doc)
self._loaded_objects.append(obj)
def oldstate(self, obj, tid):
@@ -140,25 +255,24 @@
self._registered_objects.append(obj)
def abort(self, transaction):
+ # Aborting the transaction requires three steps:
+ # 1. Remove any inserted objects.
+ for obj in self._inserted_objects:
+ coll = self._get_collection(obj)
+ coll.remove({'_id': obj._p_oid.id})
+ # 2. Re-insert any removed objects.
+ for obj in self._removed_objects:
+ coll = self._get_collection(obj)
+ coll.insert(self._original_states[obj._p_oid])
+ del self._original_states[obj._p_oid]
+ # 3. Reset any changed states.
+ for db_ref, state in self._original_states.items():
+ coll = self._conn[db_ref.database][db_ref.collection]
+ coll.update({'_id': db_ref.id}, state, True)
self.reset()
def commit(self, transaction):
- if not self.detect_conflicts:
- return
- # Check each modified object to see whether Mongo has a new version of
- # the object.
- for obj in self._registered_objects:
- # This object is not even added to the database yet, so there
- # cannot be a conflict.
- if obj._p_oid is None:
- continue
- db_name, coll_name = self._writer.get_collection_name(obj)
- coll = self._conn[db_name][coll_name]
- new_doc = coll.find_one(obj._p_oid.id, fields=('_py_serial',))
- if new_doc is None:
- continue
- if new_doc.get('_py_serial', 0) != serialize.u64(obj._p_serial):
- raise self.conflict_error_factory(obj, new_doc)
+ self._check_conflicts()
def tpc_begin(self, transaction):
pass
@@ -167,14 +281,7 @@
pass
def tpc_finish(self, transaction):
- written = []
- for obj in self._registered_objects:
- if getattr(obj, '_p_mongo_sub_object', False):
- obj = obj._p_mongo_doc_object
- if obj in written:
- continue
- self._writer.store(obj)
- written.append(obj)
+ self._flush_objects()
self.reset()
def tpc_abort(self, transaction):
Modified: mongopersist/trunk/src/mongopersist/interfaces.py
===================================================================
--- mongopersist/trunk/src/mongopersist/interfaces.py 2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/src/mongopersist/interfaces.py 2012-03-11 07:12:01 UTC (rev 124564)
@@ -126,6 +126,12 @@
root = zope.interface.Attribute(
"""Get the root object, which is a mapping.""")
+ detect_conflicts = zope.interface.Attribute(
+ """A flag, when set it enables write conflict detection.""")
+
+ def get_collection(obj):
+ """Return the collection for an object."""
+
def reset():
"""Reset the datamanager for the next transaction."""
@@ -138,7 +144,22 @@
Note: The returned object is in the ghost state.
"""
+ def flush():
+ """Flush all changes to Mongo."""
+ def insert(obj):
+ """Insert an object into Mongo.
+
+ The correct collection is determined by object type.
+ """
+
+ def remove(obj):
+ """Remove an object from Mongo.
+
+ The correct collection is determined by object type.
+ """
+
+
class IMongoConnectionPool(zope.interface.Interface):
"""MongoDB connection pool"""
@@ -172,4 +193,4 @@
"""An adapter to process find/update spec's"""
def process(collection, spec):
- """return the processed spec here"""
\ No newline at end of file
+ """return the processed spec here"""
Added: mongopersist/trunk/src/mongopersist/optimistic-data-dumping.txt
===================================================================
--- mongopersist/trunk/src/mongopersist/optimistic-data-dumping.txt (rev 0)
+++ mongopersist/trunk/src/mongopersist/optimistic-data-dumping.txt 2012-03-11 07:12:01 UTC (rev 124564)
@@ -0,0 +1,37 @@
+=======================
+Optimistic Data Dumping
+=======================
+
+``mongopersist`` tried very hard to delay any state storage as long as
+possible, specifically the end of the transaction. And in some sense, that's
+nice, because it avoided multiple writes when multiple attributes are updated
+and minimized the times of inconsistent state. However, it also meant that we
+often had to commit transactions prematurely after modifications in order to
+make multi-object-result queries work. I call this approach pessimistic data
+dumping (PDD).
+
+Now ``mongopersist`` embraces change and dumps data frequently whenever it
+makes logically sense, for example before any multi-object-result query to the
+database. And since it keeps track of the original state, you can revert all
+changes when the transaction is aborted for some reason. A consequence of this
+approach is that the database might temporarily be in an inconsistent state or
+shows data that might be removed again. This is optimistic data dumping (ODD).
+
+The problem of PDD is that it is designed for the rare case that something
+goes wrong late in a transaction and that all changes have to be reverted,
+while ODD assumes success and fixes the situation if something went wrong. It
+is like the old saying: Asking for forgiveness is easier -- and in this case
+computationally cheaper and less complex -- than asking for permission.
+
+Also, while PDD seems originally closer to real transaction safety, it is
+not. Due to the lack of frequent dumping, a single logical transaction gets
+split into many small ones without the ability to properly retrieve the
+original state if soemthing goes wrong. So at the end there is no transaction
+safety or consistency guarantee.
+
+
+With ODD, we can ensure at least partial consistency by flushing after logical
+update units. The implementing flushing policy supports this, since one would
+not query for multi-object result sets if there would not be some notion of
+temporary consistency. And at the end, the greatest benefit is the ability to
+completely undo all changes of the transaction.
Property changes on: mongopersist/trunk/src/mongopersist/optimistic-data-dumping.txt
___________________________________________________________________
Added: svn:eol-style
+ native
Modified: mongopersist/trunk/src/mongopersist/serialize.py
===================================================================
--- mongopersist/trunk/src/mongopersist/serialize.py 2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/src/mongopersist/serialize.py 2012-03-11 07:12:01 UTC (rev 124564)
@@ -372,19 +372,23 @@
return sub_obj
return state
- def set_ghost_state(self, obj):
+ def set_ghost_state(self, obj, doc=None):
# Look up the object state by coll_name and oid.
- coll = self._jar._conn[obj._p_oid.database][obj._p_oid.collection]
- doc = coll.find_one({'_id': obj._p_oid.id})
- doc.pop('_id')
- doc.pop('_py_persistent_type', None)
+ if doc is None:
+ coll = self._jar._conn[obj._p_oid.database][obj._p_oid.collection]
+ doc = coll.find_one({'_id': obj._p_oid.id})
+ doc.pop('_id')
+ doc.pop('_py_persistent_type', None)
# Store the serial, if conflict detection is enabled.
if self._jar.detect_conflicts:
obj._p_serial = p64(doc.pop('_py_serial', 0))
# Now convert the document to a proper Python state dict.
- state = self.get_object(doc, obj)
+ state = dict(self.get_object(doc, obj))
+ # Now store the original state. It is assumed that the state dict is
+ # not modified later.
+ self._jar._original_states[obj._p_oid] = doc
# Set the state.
- obj.__setstate__(dict(state))
+ obj.__setstate__(state)
def get_ghost(self, dbref):
# If we can, we return the object from cache.
Modified: mongopersist/trunk/src/mongopersist/tests/test_datamanager.py
===================================================================
--- mongopersist/trunk/src/mongopersist/tests/test_datamanager.py 2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/src/mongopersist/tests/test_datamanager.py 2012-03-11 07:12:01 UTC (rev 124564)
@@ -80,6 +80,56 @@
[]
"""
+def doctest_MongoDataManager_get_collection():
+ r"""MongoDataManager: get_collection(obj)
+
+ Get the collection for an object.
+
+ >>> foo = Foo('1')
+ >>> foo_ref = dm.insert(foo)
+ >>> dm.reset()
+
+ >>> coll = dm.get_collection(foo)
+
+ We are returning a collection wrapper instead, so that we can flush the
+ data before any method involving a query.
+
+ >>> coll
+ <mongopersist.datamanager.CollectionWrapper object at 0x19e47d0>
+
+ Let's make sure that modifying attributes is done on the original
+ collection:
+
+ >>> coll.foo = 1
+ >>> coll.collection.foo
+ 1
+ >>> coll.foo
+ 1
+ >>> del coll.foo
+
+ Let's now try the real functionality behind the wrapper. So we are in a
+ transaction and modify an object:
+
+ >>> foo_new = dm.load(foo_ref)
+ >>> foo_new.name = '2'
+
+ If we do not use the wrapper, the change is not visible:
+
+ >>> tuple(dm._get_collection(foo_new).find())
+ ({u'_id': ObjectId('4f5c1bf537a08e2ea6000000'), u'name': u'1'},)
+
+ But if we use the wrapper, the change gets flushed first:
+
+ >>> tuple(dm.get_collection(foo_new).find())
+ ({u'_id': ObjectId('4f5c1bf537a08e2ea6000000'), u'name': u'2'},)
+
+ Of course, aborting the transaction gets us back to the original state:
+
+ >>> dm.abort(transaction.get())
+ >>> tuple(dm._get_collection(foo_new).find())
+ ({u'_id': ObjectId('4f5c1bf537a08e2ea6000000'), u'name': u'1'},)
+ """
+
def doctest_MongoDataManager_object_dump_load_reset():
r"""MongoDataManager: dump(), load(), reset()
@@ -120,9 +170,180 @@
>>> foo._p_oid = foo2._p_oid
"""
-def doctest_MongoDataManager_set_state():
- r"""MongoDataManager: set_state()
+def doctest_MongoDataManager_flush():
+ r"""MongoDataManager: flush()
+ This method writes all registered objects to Mongo. It can be used at any
+ time during the transaction when a dump is necessary, but is also used at
+ the end of the transaction to dump all remaining objects.
+
+ We also want to test the effects of conflict detection:
+
+ >>> dm.detect_conflicts = True
+
+ Let's now add an object to the database and reset the manager like it is
+ done at the end of a transaction:
+
+ >>> foo = Foo('foo')
+ >>> foo_ref = dm.dump(foo)
+ >>> dm.reset()
+
+ Let's now load the object again and make a modification:
+
+ >>> foo_new = dm.load(foo._p_oid)
+ >>> foo_new.name = 'Foo'
+
+ The object is now registered with the data manager:
+
+ >>> dm._registered_objects
+ [<mongopersist.tests.test_datamanager.Foo object at 0x2f7b9b0>]
+ >>> foo_new._p_serial
+ '\x00\x00\x00\x00\x00\x00\x00\x01'
+
+ Let's now flush the registered objects:
+
+ >>> dm.flush()
+
+ There are several side effects that should be observed:
+
+ * During a given transaction, we guarantee that the user will always receive
+ the same Python object. This requires that flush does not reset the object
+ cache.
+
+ >>> id(dm.load(foo._p_oid)) == id(foo_new)
+ True
+
+ * The ``_p_serial`` is increased by one.
+
+ >>> foo_new._p_serial
+ '\x00\x00\x00\x00\x00\x00\x00\x02'
+
+ * The object is removed from the registered objects and the ``_p_changed``
+ flag is set to ``False``.
+
+ >>> dm._registered_objects
+ []
+ >>> foo_new._p_changed
+ False
+
+ * Before flushing, potential conflicts must be detected as it is done before
+ committing a transaction.
+
+ >>> foo_new._p_serial = '\x00\x00\x00\x00\x00\x00\x00\x01'
+ >>> foo_new.name = 'Foo'
+ >>> dm.flush()
+ Traceback (most recent call last):
+ ...
+ ConflictError: database conflict error
+ (oid DBRef('mongopersist.tests.test_datamanager.Foo',
+ ObjectId('4f5bfcaf37a08e2849000000'),
+ 'mongopersist_test'),
+ class Foo, start serial 1, current serial 2)
+ """
+
+def doctest_MongoDataManager_insert():
+ r"""MongoDataManager: insert(obj)
+
+ This method inserts an object into the database.
+
+ >>> foo = Foo('foo')
+ >>> foo_ref = dm.insert(foo)
+
+ After insertion, the original is not changed:
+
+ >>> foo._p_changed
+ False
+
+ It is also added to the list of inserted objects:
+
+ >>> dm._inserted_objects
+ [<mongopersist.tests.test_datamanager.Foo object at 0x18d41b8>]
+
+ Let's make sure it is really in Mongo:
+
+ >>> dm.reset()
+ >>> foo_new = dm.load(foo_ref)
+ >>> foo_new
+ <mongopersist.tests.test_datamanager.Foo object at 0x27cade8>
+
+ Notice, that we cannot insert the object again:
+
+ >>> dm.insert(foo_new)
+ Traceback (most recent call last):
+ ...
+ ValueError: ('Object has already an OID.',
+ <mongopersist.tests.test_datamanager.Foo object at 0x1fecde8>)
+
+ Finally, registering a new object will not trigger an insert, but only
+ schedule the object for writing. This is done, since sometimes objects are
+ registered when we only want to store a stub since we otherwise end up in
+ endless recursion loops.
+
+ >>> foo2 = Foo('Foo 2')
+ >>> dm.register(foo2)
+
+ >>> dm._registered_objects
+ [<mongopersist.tests.test_datamanager.Foo object at 0x3087b18>]
+
+ But storing works as expected (flush is implicit before find):
+
+ >>> tuple(dm.get_collection(foo2).find())
+ ({u'_id': ObjectId('4f5c443837a08e37bf000000'), u'name': u'foo'},
+ {u'_id': ObjectId('4f5c443837a08e37bf000001'), u'name': u'Foo 2'})
+ """
+
+def doctest_MongoDataManager_remove():
+ r"""MongoDataManager: remove(obj)
+
+ This method removes an object from the database.
+
+ >>> foo = Foo('foo')
+ >>> foo_ref = dm.insert(foo)
+ >>> dm.reset()
+
+ Let's now load the object and remove it.
+
+ >>> foo_new = dm.load(foo_ref)
+ >>> dm.remove(foo_new)
+
+ The object is removed from the collection immediately:
+
+ >>> tuple(dm._get_collection(foo_ref).find())
+ ()
+
+ Also, the object is added to the list of removed objects:
+
+ >>> dm._removed_objects
+ [<mongopersist.tests.test_datamanager.Foo object at 0x1693140>]
+
+ Note that you cannot remove objects that are not in the database:
+
+ >>> dm.remove(Foo('Foo 2'))
+ Traceback (most recent call last):
+ ValueError: ('Object does not have OID.',
+ <mongopersist.tests.test_datamanager.Foo object at 0x1982ed8>)
+
+ There is an edge case, if the object is inserted and removed in the same
+ transaction:
+
+ >>> dm.reset()
+ >>> foo3 = Foo('Foo 3')
+ >>> foo3_ref = dm.insert(foo3)
+ >>> dm.remove(foo3)
+
+ In this case, the object removed from Mongo and from the inserted object
+ list and never added to the removed object list.
+
+ >>> dm._inserted_objects
+ []
+ >>> dm._removed_objects
+ []
+
+ """
+
+def doctest_MongoDataManager_setstate():
+ r"""MongoDataManager: setstate()
+
This method loads and sets the state of an object and joins the
transaction.
@@ -195,6 +416,49 @@
True
>>> len(dm._registered_objects)
0
+
+ Let's now create a more interesting case with a transaction that inserted,
+ removed and changed objects.
+
+ First let's create an initial state:
+
+ >>> dm.reset()
+ >>> foo_ref = dm.insert(Foo('one'))
+ >>> foo2_ref = dm.insert(Foo('two'))
+ >>> dm.reset()
+
+ >>> coll = dm._get_collection(Foo())
+ >>> tuple(coll.find({}))
+ ({u'_id': ObjectId('4f5c114f37a08e2cac000000'), u'name': u'one'},
+ {u'_id': ObjectId('4f5c114f37a08e2cac000001'), u'name': u'two'})
+
+ Now, in a second transaction we modify the state of objects in all three
+ ways:
+
+ >>> foo = dm.load(foo_ref)
+ >>> foo.name = '1'
+ >>> dm._registered_objects
+ [<mongopersist.tests.test_datamanager.Foo object at 0x187b1b8>]
+
+ >>> foo2 = dm.load(foo2_ref)
+ >>> dm.remove(foo2)
+ >>> dm._removed_objects
+ [<mongopersist.tests.test_datamanager.Foo object at 0x1e5c140>]
+
+ >>> foo3_ref = dm.insert(Foo('three'))
+
+ >>> dm.flush()
+ >>> tuple(coll.find({}))
+ ({u'_id': ObjectId('4f5c114f37a08e2cac000000'), u'name': u'1'},
+ {u'_id': ObjectId('4f5c114f37a08e2cac000002'), u'name': u'three'})
+
+ Let's now abort the transaction and everything should be back to what it
+ was before:
+
+ >>> dm.abort(transaction.get())
+ >>> tuple(coll.find({}))
+ ({u'_id': ObjectId('4f5c114f37a08e2cac000000'), u'name': u'one'},
+ {u'_id': ObjectId('4f5c114f37a08e2cac000001'), u'name': u'two'})
"""
def doctest_MongoDataManager_commit():
Modified: mongopersist/trunk/src/mongopersist/tests/test_serialize.py
===================================================================
--- mongopersist/trunk/src/mongopersist/tests/test_serialize.py 2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/src/mongopersist/tests/test_serialize.py 2012-03-11 07:12:01 UTC (rev 124564)
@@ -664,6 +664,19 @@
u'top'
>>> gobj._p_serial
'\x00\x00\x00\x00\x00\x00\x00\x01'
+
+ Note that the original state is stored in the data manager:
+
+ >>> gobj._p_jar._original_states
+ {DBRef('Top', ObjectId('4f5bf4e437a08e2614000001'), 'mongopersist_test'):
+ {u'name': u'top'}}
+
+ This state does not change, even when the object is modified:
+
+ >>> gobj.name = 'stop'
+ >>> gobj._p_jar._original_states[gobj._p_oid] != gobj.__getstate__()
+ True
+
"""
Modified: mongopersist/trunk/src/mongopersist/zope/container.py
===================================================================
--- mongopersist/trunk/src/mongopersist/zope/container.py 2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/src/mongopersist/zope/container.py 2012-03-11 07:12:01 UTC (rev 124564)
@@ -22,7 +22,7 @@
from mongopersist import interfaces, serialize
from mongopersist.zope import interfaces as zinterfaces
-from mongopersist.datamanager import processSpec
+from mongopersist.datamanager import processSpec, CollectionWrapper
class MongoContained(contained.Contained):
@@ -42,6 +42,7 @@
class SimpleMongoContainer(sample.SampleContainer, persistent.Persistent):
+ _m_remove_documents = True
def __getstate__(self):
state = super(SimpleMongoContainer, self).__getstate__()
@@ -81,12 +82,15 @@
def values(self):
return [v for k, v in self.items()]
- def __setitem__(self, key, object):
- super(SimpleMongoContainer, self).__setitem__(key, object)
+ def __setitem__(self, key, obj):
+ super(SimpleMongoContainer, self).__setitem__(key, obj)
self._p_changed = True
def __delitem__(self, key):
+ obj = self[key]
super(SimpleMongoContainer, self).__delitem__(key)
+ if self._m_remove_documents:
+ self._p_jar.remove(obj)
self._p_changed = True
@@ -98,6 +102,7 @@
_m_collection = None
_m_mapping_key = 'key'
_m_parent_key = 'parent'
+ _m_remove_documents = True
def __init__(self, collection=None, database=None,
mapping_key=None, parent_key=None):
@@ -111,16 +116,6 @@
self._m_parent_key = parent_key
@property
- def _added(self):
- ann = self._m_jar.annotations.setdefault(self._p_oid or id(self), {})
- return ann.setdefault('added', {})
-
- @property
- def _deleted(self):
- ann = self._m_jar.annotations.setdefault(self._p_oid or id(self), {})
- return ann.setdefault('deleted', {})
-
- @property
def _m_jar(self):
# If the container is in a Mongo storage hierarchy, then getting the
# datamanager is easy, otherwise we do an adapter lookup.
@@ -133,7 +128,8 @@
def get_collection(self):
db_name = self._m_database or self._m_jar.default_database
- return self._m_jar._conn[db_name][self._m_collection]
+ return CollectionWrapper(
+ self._m_jar._conn[db_name][self._m_collection], self._m_jar)
def _m_get_parent_key_value(self):
if getattr(self, '_p_jar', None) is None:
@@ -155,62 +151,56 @@
filter[self._m_parent_key] = gs(self._m_get_parent_key_value())
return filter
- def __getitem__(self, key):
- if key in self._added:
- return self._added[key]
- if key in self._deleted:
- raise KeyError(key)
- filter = self._m_get_items_filter()
- filter[self._m_mapping_key] = key
- coll = self.get_collection()
- doc = coll.find_one(processSpec(coll, filter), fields=())
- if doc is None:
- raise KeyError(key)
+ def _load_one(self, doc):
+ # Create a DBRef object and then load the full state of the object.
dbref = pymongo.dbref.DBRef(
self._m_collection, doc['_id'],
self._m_database or self._m_jar.default_database)
obj = self._m_jar._reader.get_ghost(dbref)
- obj._v_key = key
+ self._m_jar.setstate(obj, doc)
+ obj._v_key = doc[self._m_mapping_key]
obj._v_parent = self
return obj
+ def __getitem__(self, key):
+ filter = self._m_get_items_filter()
+ filter[self._m_mapping_key] = key
+ coll = self.get_collection()
+ doc = coll.find_one(processSpec(coll, filter))
+ if doc is None:
+ raise KeyError(key)
+ return self._load_one(doc)
def _real_setitem(self, key, value):
# This call by iteself caues the state to change _p_changed to True.
setattr(value, self._m_mapping_key, key)
if self._m_parent_key is not None:
setattr(value, self._m_parent_key, self._m_get_parent_key_value())
- self._m_jar.register(value)
- # Temporarily store the added object, so it is immediately available
- # via the API.
- self._added[key] = value
- self._deleted.pop(key, None)
+ self._m_jar.insert(value)
def __setitem__(self, key, value):
contained.setitem(self, self._real_setitem, key, value)
def __delitem__(self, key):
- # Deleting the object from the database is not our job. We simply
- # remove it from the dictionary.
value = self[key]
+ # First remove the parent and name from the object.
if self._m_mapping_key is not None:
delattr(value, self._m_mapping_key)
if self._m_parent_key is not None:
delattr(value, self._m_parent_key)
- self._deleted[key] = value
- self._added.pop(key, None)
+ # Let's now remove the object from the database.
+ if self._m_remove_documents:
+ self._m_jar.remove(value)
+ # Send the uncontained event.
contained.uncontained(value, self, key)
def keys(self):
filter = self._m_get_items_filter()
filter[self._m_mapping_key] = {'$ne': None}
coll = self.get_collection()
- keys = [
- doc[self._m_mapping_key]
- for doc in coll.find(processSpec(coll, filter))
- if not doc[self._m_mapping_key] in self._deleted]
- keys += self._added.keys()
- return keys
+ return [doc[self._m_mapping_key]
+ for doc in coll.find(processSpec(coll, filter),
+ fields=(self._m_mapping_key,))]
def raw_find(self, spec=None, *args, **kwargs):
if spec is None:
@@ -219,18 +209,11 @@
coll = self.get_collection()
return coll.find(processSpec(coll, spec), *args, **kwargs)
- def find(self, spec=None, fields=None, *args, **kwargs):
- # If fields were not specified, we only request the oid and the key.
- fields = tuple(fields or ())
- fields += (self._m_mapping_key,)
- result = self.raw_find(spec, fields, *args, **kwargs)
+ def find(self, spec=None, *args, **kwargs):
+ # Search for matching objects.
+ result = self.raw_find(spec, *args, **kwargs)
for doc in result:
- dbref = pymongo.dbref.DBRef(
- self._m_collection, doc['_id'],
- self._m_database or self._m_jar.default_database)
- obj = self._m_jar._reader.get_ghost(dbref)
- obj._v_key = doc[self._m_mapping_key]
- obj._v_parent = self
+ obj = self._load_one(doc)
yield obj
def raw_find_one(self, spec_or_id=None, *args, **kwargs):
@@ -242,20 +225,11 @@
coll = self.get_collection()
return coll.find_one(processSpec(coll, spec_or_id), *args, **kwargs)
- def find_one(self, spec_or_id=None, fields=None, *args, **kwargs):
- # If fields were not specified, we only request the oid and the key.
- fields = tuple(fields or ())
- fields += (self._m_mapping_key,)
- doc = self.raw_find_one(spec_or_id, fields, *args, **kwargs)
+ def find_one(self, spec_or_id=None, *args, **kwargs):
+ doc = self.raw_find_one(spec_or_id, *args, **kwargs)
if doc is None:
return None
- dbref = pymongo.dbref.DBRef(
- self._m_collection, doc['_id'],
- self._m_database or self._m_jar.default_database)
- obj = self._m_jar._reader.get_ghost(dbref)
- obj._v_key = doc[self._m_mapping_key]
- obj._v_parent = self
- return obj
+ return self._load_one(doc)
class AllItemsMongoContainer(MongoContainer):
_m_parent_key = None
Modified: mongopersist/trunk/src/mongopersist/zope/tests/test_container.py
===================================================================
--- mongopersist/trunk/src/mongopersist/zope/tests/test_container.py 2012-03-11 00:34:00 UTC (rev 124563)
+++ mongopersist/trunk/src/mongopersist/zope/tests/test_container.py 2012-03-11 07:12:01 UTC (rev 124564)
@@ -138,6 +138,11 @@
>>> transaction.commit()
>>> dm.root['c'].keys()
[]
+
+ The object is also removed from Mongo:
+
+ >>> pprint(list(db['person'].find()))
+ []
"""
@@ -192,7 +197,7 @@
>>> dm.root['c']['stephan'].__parent__
<mongopersist.zope.container.MongoContainer object at 0x7fec50f86500>
>>> dm.root['c']['stephan'].__name__
- 'stephan'
+ u'stephan'
We get a usual key error, if an object does not exist:
@@ -536,7 +541,7 @@
>>> stephan = root['app']['people']['stephan']
>>> stephan.__name__
- 'stephan'
+ u'stephan'
>>> stephan.__parent__
<mongopersist.zope.container.MongoContainer object at 0x7f6b6273b7d0>
More information about the checkins
mailing list