[Zope-CVS] CVS: Products/QueueCatalog/tests - __init__.py:1.1 test_CatalogEventQueueSet.py:1.1

Tres Seaver tseaver@zope.com
Wed, 4 Jun 2003 11:15:22 -0400


Update of /cvs-repository/Products/QueueCatalog/tests
In directory cvs.zope.org:/tmp/cvs-serv22439/tests

Added Files:
	__init__.py test_CatalogEventQueueSet.py 
Log Message:


  - Add a new object, CatalogEventQueueSet, which manages a hashtable
    of CatalogEventQueues, and delegates update events to them.  It
    holds a pointer to a "delegate", which it calls to do the actual
    work during its 'process'.


=== Added File Products/QueueCatalog/tests/__init__.py ===
""" QueueCatalog unit tests package.

$Id: __init__.py,v 1.1 2003/06/04 15:15:21 tseaver Exp $
"""


=== Added File Products/QueueCatalog/tests/test_CatalogEventQueueSet.py ===
from __future__ import generators

import unittest

from Acquisition import aq_base

from Products.QueueCatalog.CatalogEventQueue import ADDED
from Products.QueueCatalog.CatalogEventQueue import CHANGED
from Products.QueueCatalog.CatalogEventQueue import CHANGED_ADDED
from Products.QueueCatalog.CatalogEventQueue import REMOVED

# Sieve of Eratosthenes
# David Eppstein, UC Irvine, 28 Feb 2002
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/117119/index_txt

def eratosthenes():
	'''Yields the sequence of prime numbers via the Sieve of Eratosthenes.'''
	D = {}  # map composite integers to primes witnessing their compositeness
	q = 2   # first integer to test for primality
	while 1:
		if q not in D:
			yield q        # not marked composite, must be prime
			D[q*q] = [q]   # first multiple of q not already marked
		else:
			for p in D[q]: # move each witness to its next multiple
				D.setdefault(p+q,[]).append(p)
			del D[q]       # no longer need D[q], free memory
		q += 1

def _isPrime( count ):

    """ Alternate implementation, using Sieve.
    """
    assert type( count ) is type( 0 )

    for prime in eratosthenes():

        if prime == count:
            return 1

        if prime > count:
            return 0



class CatalogEventQueueSetTests( unittest.TestCase ):

    def _getTargetClass( self ):

        from Products.QueueCatalog.CatalogEventQueueSet \
            import CatalogEventQueueSet

        return CatalogEventQueueSet

    def _makeOne( self, *args, **kw ):

        return self._getTargetClass()( *args, **kw )

    def _makeDelegate( self ):

        from Products.QueueCatalog.CatalogEventQueueSet \
            import ICatalogEventQueueSetDelegate

        class DummyCatalogEventQueueSetDelegate:

            __implements__ = ( ICatalogEventQueueSetDelegate, )

            def __init__( self ):
                self._uids = {}

            def hasUID( self, uid ):

                return self._uids.get( uid ) is not None

            def add( self, uid ):

                if self._uids.get( uid ) is not None:
                    raise AssertionError, 'Duplicate'
                self._uids[ uid ] = 0

            def change( self, uid ):

                self._uids[ uid ] = self._uids.setdefault( uid, 0 ) + 1

            def remove( self, uid ):

                try:
                    del self._uids[ uid ]
                except KeyError:
                    pass

            def getUIDCount( self, uid ):
                return self._uids[ uid ]

        return DummyCatalogEventQueueSetDelegate()

    def test_ctor_default( self ):

        set = self._makeOne()

        self.failUnless( _isPrime( set.getBucketCount() ) )
        self.assertEquals( set.getBucketCount(), len( set._queues ) )

        self.assertEquals( set.getDelegate(), None )

        events = [ x for x in set.listEvents() ]
        self.assertEquals( len( events ), 0 )

    def test_ctor_explicit_delegate( self ):

        delegate = self._makeDelegate()
        set = self._makeOne( delegate=delegate )

        self.failUnless( aq_base( set.getDelegate() ) is delegate )

    def test_ctor_nonconforming_delegate( self ):

        class NotDelegate:
            pass

        self.assertRaises( ValueError, self._makeOne, delegate=NotDelegate() )

    def test_ctor_explicit_bucket_count( self ):

        set = self._makeOne( bucket_count=11 )
        self.assertEquals( set.getBucketCount(), 11 )
        self.assertEquals( len( set._queues ), 11 )

    def test_ctor_bucket_count_not_prime( self ):

        self.assertRaises( ValueError, self._makeOne, bucket_count=1008 )

    def test_setDelegate( self ):

        set = self._makeOne()
        delegate = self._makeDelegate()

        set.setDelegate( delegate )
        self.failUnless( aq_base( set.getDelegate() ) is delegate )

    def test_setDelegate_nonconforming( self ):

        class NotDelegate:
            pass

        set = self._makeOne()

        self.assertRaises( ValueError, set.setDelegate, NotDelegate() )

    def test_setBucketCount( self ):

        set = self._makeOne()

        set.setBucketCount( 11 )
        self.assertEquals( set.getBucketCount(), 11 )
        self.assertEquals( len( set._queues ), 11 )

    def test_setBucketCount_non_prime( self ):

        set = self._makeOne()

        self.assertRaises( ValueError, set.setBucketCount, 9 )

    def test_update_non_event( self ):

        set = self._makeOne()

        self.assertRaises( ValueError, set.update, 'foo', 42 )

    def test_update( self ):

        set = self._makeOne()

        set.update( 'foo', ADDED )

        events = [ x for x in set.listEvents() ]
        self.assertEquals( len( events ), 1 )

        uid, event = events[0]

        self.assertEqual( uid, 'foo' )
        self.assertEqual( event, ADDED )

    def test_update_collapsing_add_change( self ):

        set = self._makeOne()

        set.update( 'foo', ADDED )
        set.update( 'foo', CHANGED )

        events = [ x for x in set.listEvents() ]
        self.assertEquals( len( events ), 1 )

        uid, event = events[0]

        self.assertEqual( uid, 'foo' )
        self.assertEqual( event, CHANGED_ADDED )

    def test_update_collapsing_add_change_remove( self ):

        set = self._makeOne()

        set.update( 'bar', ADDED )
        set.update( 'bar', CHANGED )
        set.update( 'bar', REMOVED )

        events = [ x for x in set.listEvents() ]
        self.assertEquals( len( events ), 1, events )

        uid, event = events[0]

        #  REMOVED is still there, because it may be needed to clean up
        #  after "immediate" indexing.
        self.assertEqual( uid, 'bar' )
        self.assertEqual( event, REMOVED )

    def test_update_remove_then_change( self ):

        set = self._makeOne()

        set.update( 'bar', ADDED )
        set.update( 'bar', REMOVED )

        self.assertRaises( TypeError, set.update, 'bar', CHANGED )

    def test_process_add( self ):

        delegate = self._makeDelegate()
        set = self._makeOne( delegate=delegate )

        self.failIf( delegate.hasUID( 'added' ) )

        set.update( 'added', ADDED )

        set.process()

        self.failUnless( delegate.hasUID( 'added' ) )
        self.assertEqual( delegate.getUIDCount( 'added' ), 0 )

    def test_process_change( self ):

        delegate = self._makeDelegate()
        set = self._makeOne( delegate=delegate )

        self.failIf( delegate.hasUID( 'changed' ) )

        set.update( 'changed', CHANGED )
        set.update( 'changed', CHANGED )
        set.update( 'changed', CHANGED )

        set.process()

        self.failUnless( delegate.hasUID( 'changed' ) )
        self.assertEqual( delegate.getUIDCount( 'changed' ), 1 )

    def test_process_add_and_change( self ):

        delegate = self._makeDelegate()
        set = self._makeOne( delegate=delegate )

        self.failIf( delegate.hasUID( 'added_changed' ) )

        set.update( 'added_changed', ADDED )
        self.failIf( delegate.hasUID( 'added_changed' ) )

        set.update( 'added_changed', CHANGED )
        self.failIf( delegate.hasUID( 'added_changed' ) )

        set.update( 'added_changed', CHANGED )
        self.failIf( delegate.hasUID( 'added_changed' ) )

        set.process()

        self.failUnless( delegate.hasUID( 'added_changed' ) )
        self.assertEqual( delegate.getUIDCount( 'added_changed' ), 0 )

    def test_process_remove( self ):

        delegate = self._makeDelegate()
        set = self._makeOne( delegate=delegate )

        self.failIf( delegate.hasUID( 'removed' ) )

        set.update( 'removed', REMOVED )
        self.failIf( delegate.hasUID( 'removed' ) )

        set.process()

        self.failIf( delegate.hasUID( 'removed' ) )

    def test_process_add_and_remove( self ):

        delegate = self._makeDelegate()
        set = self._makeOne( delegate=delegate )

        self.failIf( delegate.hasUID( 'added_removed' ) )

        set.update( 'added_removed', ADDED )
        self.failIf( delegate.hasUID( 'added_removed' ) )

        set.update( 'added_removed', REMOVED )
        self.failIf( delegate.hasUID( 'added_removed' ) )

        set.process()

        self.failIf( delegate.hasUID( 'added_removed' ) )


# TODO:
#  test_process_*