[Zope-CVS] CVS: Products/QueueCatalog/tests - test_QueueCatalog.py:1.1 test_CatalogEventQueueSet.py:1.2

Shane Hathaway shane@zope.com
Wed, 18 Jun 2003 15:37:30 -0400


Update of /cvs-repository/Products/QueueCatalog/tests
In directory cvs.zope.org:/tmp/cvs-serv16520/tests

Modified Files:
	test_CatalogEventQueueSet.py 
Added Files:
	test_QueueCatalog.py 
Log Message:
Added minimal unit tests and brought in sync with work done for a customer.

- The processing limit is now enforced with more precision.  Until
  now, you could get stuck in a situation where you had to process
  thousands of queue entries in a single transaction.  Fixed.

- Add to the catalog immediately only if there are indexes to update
  immediately.

- After processing the queue, show how many items were processed.

- Minor reformatting.



=== Added File Products/QueueCatalog/tests/test_QueueCatalog.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""QueueCatalog tests.

$Id: test_QueueCatalog.py,v 1.1 2003/06/18 19:37:30 shane Exp $
"""

import unittest
import Testing
import Zope
Zope.startup()

from Products.ZCatalog.ZCatalog import ZCatalog
from Products.QueueCatalog.QueueCatalog import QueueCatalog
from OFS.Folder import Folder


class QueueCatalogTests(unittest.TestCase):

    def setUp(self):
        app = Zope.app()
        self.app = app
        app.real_cat = ZCatalog('real_cat')
        app.real_cat.addIndex('id', 'FieldIndex')
        app.real_cat.addIndex('meta_type', 'FieldIndex')
        app.queue_cat = QueueCatalog(3)  # 3 buckets
        app.queue_cat.id = 'queue_cat'
        app.queue_cat.manage_edit(location='/real_cat',
                                  immediate_indexes=['id'])

    def tearDown(self):
        get_transaction().abort()
        del self.app


    def testAddObject(self):
        app = self.app
        app.f1 = Folder()
        app.f1.id = 'f1'
        self.assertEqual(app.queue_cat.manage_size(), 0)
        self.assertEqual(len(app.real_cat), 0)
        app.queue_cat.catalog_object(app.f1)
        self.assertEqual(app.queue_cat.manage_size(), 1)
        self.assertEqual(len(app.real_cat), 1)


    def testDeferMostIndexes(self):
        app = self.app
        app.f1 = Folder()
        app.f1.id = 'f1'
        app.queue_cat.catalog_object(app.f1)
        # The 'id' index gets updated immediately.
        res = app.queue_cat.searchResults(id='f1')
        self.assertEqual(len(res), 1)
        # In this test, the 'meta_type' index gets updated later.
        res = app.queue_cat.searchResults(meta_type='Folder')
        self.assertEqual(len(res), 0)
        # Process the queue.
        app.queue_cat.process()
        # Now that the queue has been processed, the item shows up.
        res = app.queue_cat.searchResults(meta_type='Folder')
        self.assertEqual(len(res), 1)


    def testQueueProcessingLimit(self):
        # Don't try to process too many items at once.
        app = self.app
        for n in range(100):
            f = Folder()
            f.id = 'f%d' % n
            setattr(app, f.id, f)
            f = getattr(app, f.id)
            app.queue_cat.catalog_object(f)
        # None of the items should be in the meta_type index yet.
        res = app.queue_cat.searchResults(meta_type='Folder')
        self.assertEqual(len(res), 0)
        # Process only 10 of the items.
        app.queue_cat.process(max=10)
        # There should now be 10 items in the results.
        res = app.queue_cat.searchResults(meta_type='Folder')
        self.assertEqual(len(res), 10)
        # Process another 25.
        app.queue_cat.process(max=25)
        # There should now be 35 items in the results.
        res = app.queue_cat.searchResults(meta_type='Folder')
        self.assertEqual(len(res), 35)
        # Finish.
        app.queue_cat.process()
        res = app.queue_cat.searchResults(meta_type='Folder')
        self.assertEqual(len(res), 100)


    def testGetIndexInfo(self):
        info = self.app.queue_cat.getIndexInfo()
        self.assertEqual(len(info), 2)
        self.assert_({'id': 'id', 'meta_type': 'FieldIndex'} in info)
        self.assert_({'id': 'meta_type', 'meta_type': 'FieldIndex'} in info)


# Enable this test when DemoStorage supports conflict resolution.

##    def testSimpleConflict(self):
##        # Verifies that the queue resolves conflicts.
##        # Prepare the data.
##        app = self.app
##        for n in range(10):
##            f = Folder()
##            f.id = 'f%d' % n
##            setattr(app, f.id, f)
##            g = Folder()
##            g.id = 'g%d' % n
##            setattr(app, g.id, g)
##        get_transaction().commit()
##        # Queue some events.
##        for n in range(10):
##            f = getattr(app, 'f%d' % n) 
##            app.queue_cat.catalog_object(f)
##        # Let another thread add events that change the same queues.
##        from thread import start_new_thread, allocate_lock
##        lock = allocate_lock()
##        lock.acquire()
##        start_new_thread(self._simpleConflictThread, (lock,))
##        lock.acquire()
##        # Verify the other thread did its work.
##        app2 = Zope.app()
##        try:
##            self.assertEqual(len(app2.queue_cat()), 10)
##        finally:
##            del app2
##        # Commit the conflicting changes.
##        get_transaction().commit()
##        # Did it work?
##        self.assertEqual(len(app.queue_cat()), 20)

##    def _simpleConflictThread(self, lock):
##        try:
##            app = Zope.app()
##            for n in range(10):
##                g = getattr(app, 'g%d' % n) 
##                app.queue_cat.catalog_object(g)
##            get_transaction().commit()
##            del app
##        finally:
##            lock.release()


if __name__ == '__main__':
    unittest.main()



=== Products/QueueCatalog/tests/test_CatalogEventQueueSet.py 1.1 => 1.2 ===
--- Products/QueueCatalog/tests/test_CatalogEventQueueSet.py:1.1	Wed Jun  4 11:15:21 2003
+++ Products/QueueCatalog/tests/test_CatalogEventQueueSet.py	Wed Jun 18 15:37:30 2003
@@ -14,18 +14,18 @@
 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/117119/index_txt
 
 def eratosthenes():
-	'''Yields the sequence of prime numbers via the Sieve of Eratosthenes.'''
-	D = {}  # map composite integers to primes witnessing their compositeness
-	q = 2   # first integer to test for primality
-	while 1:
-		if q not in D:
-			yield q        # not marked composite, must be prime
-			D[q*q] = [q]   # first multiple of q not already marked
-		else:
-			for p in D[q]: # move each witness to its next multiple
-				D.setdefault(p+q,[]).append(p)
-			del D[q]       # no longer need D[q], free memory
-		q += 1
+    '''Yields the sequence of prime numbers via the Sieve of Eratosthenes.'''
+    D = {}  # map composite integers to primes witnessing their compositeness
+    q = 2   # first integer to test for primality
+    while 1:
+        if q not in D:
+            yield q        # not marked composite, must be prime
+            D[q*q] = [q]   # first multiple of q not already marked
+        else:
+            for p in D[q]: # move each witness to its next multiple
+                D.setdefault(p+q,[]).append(p)
+            del D[q]       # no longer need D[q], free memory
+        q += 1
 
 def _isPrime( count ):