[Checkins] SVN: appendonly/trunk/ Add a conflict-free 'Accumulator' class

Tres Seaver cvs-admin at zope.org
Mon Feb 25 21:52:34 UTC 2013


Log message for revision 129808:
  Add a conflict-free 'Accumulator' class
  
  Instances manage a queue of items which can be iterated, appended to, or
  conumed entirely (no partial / pop).
  

Changed:
  _U  appendonly/trunk/
  U   appendonly/trunk/CHANGES.txt
  U   appendonly/trunk/appendonly/__init__.py
  U   appendonly/trunk/appendonly/tests.py

-=-
Modified: appendonly/trunk/CHANGES.txt
===================================================================
--- appendonly/trunk/CHANGES.txt	2013-02-25 21:52:33 UTC (rev 129807)
+++ appendonly/trunk/CHANGES.txt	2013-02-25 21:52:34 UTC (rev 129808)
@@ -4,6 +4,9 @@
 0.11 (unreleased)
 -----------------
 
+- Added a conflict-free 'Accumulator' class: manages a queue of items which
+  can be iterated, appended to, or conumed entirely (no partial / pop).
+
 - Automated tests for supported Python versions via 'tox'.
 
 - Dropped support for Python 2.4 / 2.5.

Modified: appendonly/trunk/appendonly/__init__.py
===================================================================
--- appendonly/trunk/appendonly/__init__.py	2013-02-25 21:52:33 UTC (rev 129807)
+++ appendonly/trunk/appendonly/__init__.py	2013-02-25 21:52:34 UTC (rev 129808)
@@ -240,3 +240,52 @@
         if committed['_generation'] == new['_generation']:
             return committed
         raise ConflictError('Conflicting generations')
+
+
+class Accumulator(Persistent):
+
+    __slots__ = ('_list',)
+
+    def __init__(self, value=()):
+        self._list = list(value)
+
+    def __iter__(self):
+        return iter(self._list)
+
+    def append(self, v):
+        self._list.append(v)
+
+    def consume(self):
+        result, self._list = self._list[:], []
+        return result
+
+    def __getstate__(self):
+        return self._list
+
+    def __setstate__(self, value):
+        self._list = list(value)
+
+    #
+    # ZODB Conflict resolution
+    #
+    # The only allowable mutations append or clear the list (or clear
+    # followed by one or more appends).  If either the committed or the
+    # new state has cleared, contcatenate the suffices from both.  Otherwise,
+    # contcatenate the new suffix to committed.
+    #
+    def _p_resolveConflict(self, old, committed, new):
+        if committed[:len(old)] == old:
+            c_clear = False
+            c_sfx = committed[len(old):]
+        else:
+            c_clear = True
+            c_sfx = committed[:]
+        if new[:len(old)] == old:
+            n_clear = False
+            n_sfx = new[len(old):]
+        else:
+            n_clear = True
+            n_sfx = new[:]
+        if c_clear or n_clear:
+            return c_sfx + n_sfx
+        return committed + n_sfx

Modified: appendonly/trunk/appendonly/tests.py
===================================================================
--- appendonly/trunk/appendonly/tests.py	2013-02-25 21:52:33 UTC (rev 129807)
+++ appendonly/trunk/appendonly/tests.py	2013-02-25 21:52:34 UTC (rev 129808)
@@ -572,3 +572,140 @@
         archive = self._makeOne()
         self.assertRaises(ConflictError, archive._p_resolveConflict,
                           O_STATE, C_STATE, N_STATE)
+
+
+class AccumulatorTests(unittest.TestCase):
+
+    def _getTargetClass(self):
+        from appendonly import Accumulator
+        return Accumulator
+
+    def _makeOne(self, value=None):
+        if value is None:
+            return self._getTargetClass()()
+        return self._getTargetClass()(value)
+
+    def test_ctor_wo_value(self):
+        aclist = self._makeOne()
+        self.assertEqual(aclist._list, [])
+
+    def test_ctor_w_value(self):
+        VALUE = [0, 1, 2]
+        aclist = self._makeOne(VALUE)
+        self.assertEqual(aclist._list, VALUE)
+        self.assertFalse(aclist._list is VALUE)
+
+    def test___iter___empty(self):
+        aclist = self._makeOne()
+        self.assertEqual(list(aclist), [])
+
+    def test___iter___filled(self):
+        VALUE = [0, 1, 2]
+        aclist = self._makeOne(VALUE)
+        self.assertEqual(list(aclist), VALUE)
+
+    def test_append(self):
+        VALUE = [0, 1, 2]
+        aclist = self._makeOne()
+        for value in VALUE:
+            aclist.append(value)
+        self.assertEqual(list(aclist), VALUE)
+
+    def test_consume(self):
+        VALUE = [0, 1, 2]
+        aclist = self._makeOne(VALUE)
+        result = aclist.consume()
+        self.assertEqual(result, VALUE)
+        self.assertEqual(list(aclist), [])
+
+    def test___getstate___empty(self):
+        aclist = self._makeOne()
+        self.assertEqual(aclist.__getstate__(), [])
+
+    def test___getstate___filled(self):
+        VALUE = [0, 1, 2]
+        aclist = self._makeOne(VALUE)
+        self.assertEqual(aclist.__getstate__(), VALUE)
+
+    def test___setstate___empty(self):
+        aclist = self._makeOne([0, 1, 2])
+        aclist.__setstate__([])
+        self.assertEqual(list(aclist), [])
+
+    def test___setstate___filled(self):
+        VALUE = [0, 1, 2]
+        aclist = self._makeOne()
+        aclist.__setstate__(VALUE)
+        self.assertEqual(list(aclist), VALUE)
+
+    def test__p_resolveConflict_w_both_append(self):
+        O_STATE = [1, 2, 3]
+        C_STATE = [1, 2, 3, 4]
+        N_STATE = [1, 2, 3, 5]
+        aclist = self._makeOne()
+        resolved = aclist._p_resolveConflict(O_STATE, C_STATE, N_STATE)
+        self.assertEqual(resolved, [1, 2, 3, 4, 5])
+
+    def test__p_resolveConflict_w_c_consume_n_append(self):
+        O_STATE = [1, 2, 3]
+        C_STATE = []
+        N_STATE = [1, 2, 3, 4]
+        aclist = self._makeOne()
+        resolved = aclist._p_resolveConflict(O_STATE, C_STATE, N_STATE)
+        self.assertEqual(resolved, [4])
+
+    def test__p_resolveConflict_w_n_consume_c_append(self):
+        O_STATE = [1, 2, 3]
+        C_STATE = [1, 2, 3, 4]
+        N_STATE = []
+        aclist = self._makeOne()
+        resolved = aclist._p_resolveConflict(O_STATE, C_STATE, N_STATE)
+        self.assertEqual(resolved, [4])
+
+    def test__p_resolveConflict_w_both_consume(self):
+        O_STATE = [1, 2, 3]
+        C_STATE = []
+        N_STATE = []
+        aclist = self._makeOne()
+        resolved = aclist._p_resolveConflict(O_STATE, C_STATE, N_STATE)
+        self.assertEqual(resolved, [])
+
+    def test__p_resolveConflict_w_c_append_n_consume_append(self):
+        O_STATE = [1, 2, 3]
+        C_STATE = [1, 2, 3, 4]
+        N_STATE = [5]
+        aclist = self._makeOne()
+        resolved = aclist._p_resolveConflict(O_STATE, C_STATE, N_STATE)
+        self.assertEqual(resolved, [4, 5])
+
+    def test__p_resolveConflict_w_n_append_c_consume_append(self):
+        O_STATE = [1, 2, 3]
+        C_STATE = [1, 2, 3, 4]
+        N_STATE = [5]
+        aclist = self._makeOne()
+        resolved = aclist._p_resolveConflict(O_STATE, C_STATE, N_STATE)
+        self.assertEqual(resolved, [4, 5])
+
+    def test__p_resolveConflict_w_c_consume_n_consume_append(self):
+        O_STATE = [1, 2, 3]
+        C_STATE = []
+        N_STATE = [4]
+        aclist = self._makeOne()
+        resolved = aclist._p_resolveConflict(O_STATE, C_STATE, N_STATE)
+        self.assertEqual(resolved, [4])
+
+    def test__p_resolveConflict_w_n_consume_c_consume_append(self):
+        O_STATE = [1, 2, 3]
+        C_STATE = [4]
+        N_STATE = []
+        aclist = self._makeOne()
+        resolved = aclist._p_resolveConflict(O_STATE, C_STATE, N_STATE)
+        self.assertEqual(resolved, [4])
+
+    def test__p_resolveConflict_w_both_consume_append(self):
+        O_STATE = [1, 2, 3]
+        C_STATE = [4, 5]
+        N_STATE = [6]
+        aclist = self._makeOne()
+        resolved = aclist._p_resolveConflict(O_STATE, C_STATE, N_STATE)
+        self.assertEqual(resolved, [4, 5, 6])



More information about the checkins mailing list