[Checkins] SVN: zope.index/trunk/src/zope/index/topic/ Tidysrc/zope/index/topic/index.py

Tres Seaver tseaver at palladion.com
Thu Jun 11 04:54:45 EDT 2009


Log message for revision 100847:
  Tidysrc/zope/index/topic/index.py

Changed:
  U   zope.index/trunk/src/zope/index/topic/index.py
  U   zope.index/trunk/src/zope/index/topic/tests/test_index.py

-=-
Modified: zope.index/trunk/src/zope/index/topic/index.py
===================================================================
--- zope.index/trunk/src/zope/index/topic/index.py	2009-06-11 07:54:07 UTC (rev 100846)
+++ zope.index/trunk/src/zope/index/topic/index.py	2009-06-11 08:54:45 UTC (rev 100847)
@@ -48,6 +48,12 @@
         """ remove a filter given by its ID 'id' """
         del self._filters[id]
 
+    def clearFilters(self):
+        """ Clear existing filters of their docids, but leave them in place.
+        """
+        for filter in self._filters.values():
+            filter.clear()
+
     def index_doc(self, docid, obj):
         """index an object"""
 
@@ -86,7 +92,8 @@
                 if not rs:
                     break
         else:
-            raise TypeError('Topic index only supports `and` and `or` operators, not `%s`.' % operator)
+            raise TypeError('Topic index only supports `and` and `or` '
+                            'operators, not `%s`.' % operator)
             
         if rs:
             return rs

Modified: zope.index/trunk/src/zope/index/topic/tests/test_index.py
===================================================================
--- zope.index/trunk/src/zope/index/topic/tests/test_index.py	2009-06-11 07:54:07 UTC (rev 100846)
+++ zope.index/trunk/src/zope/index/topic/tests/test_index.py	2009-06-11 08:54:45 UTC (rev 100847)
@@ -15,11 +15,10 @@
 
 $Id$
 """
-from unittest import TestCase, TestSuite, main, makeSuite 
+import unittest
 
 import BTrees
 
-from zope.index.topic.index import TopicIndex
 from zope.index.topic.filter import PythonFilteredSet
 from zope.interface.verify import verifyClass
 from zope.interface.interface import implementedBy
@@ -30,114 +29,375 @@
     def __init__(self, meta_type):
         self.meta_type = meta_type
 
-class TopicIndexTest(TestCase):
+_marker = object()
 
-    family = BTrees.family32
+class TopicIndexTest(unittest.TestCase):
 
-    def setUp(self):
-        self.index = TopicIndex(family=self.family)
-        self.index.addFilter(
-            PythonFilteredSet('doc1', "context.meta_type == 'doc1'",
-                              self.family))
-        self.index.addFilter(
-            PythonFilteredSet('doc2', "context.meta_type == 'doc2'",
-                              self.family))
-        self.index.addFilter(
-            PythonFilteredSet('doc3', "context.meta_type == 'doc3'",
-                              self.family))
+    def _getTargetClass(self):
+        from zope.index.topic.index import TopicIndex
+        return TopicIndex
 
-        self.index.index_doc(0 , O('doc0'))
-        self.index.index_doc(1 , O('doc1'))
-        self.index.index_doc(2 , O('doc1'))
-        self.index.index_doc(3 , O('doc2'))
-        self.index.index_doc(4 , O('doc2'))
-        self.index.index_doc(5 , O('doc3'))
-        self.index.index_doc(6 , O('doc3'))
+    def _get_family(self):
+        import BTrees
+        return BTrees.family32
 
-    def _search(self, query, expected, operator='and'):
+    def _makeOne(self, family=_marker):
+        if family is _marker:
+            family = self._get_family()
+        if family is None:
+            return self._getTargetClass()()
+        return self._getTargetClass()(family)
+
+    def _search(self, index, query, expected, operator='and'):
         
-        result = self.index.search(query, operator)
+        result = index.search(query, operator)
         self.assertEqual(result.keys(), expected)
 
-    def _search_or(self, query, expected):
-        return self._search(query, expected, 'or')
+    def _search_or(self, index, query, expected):
+        return self._search(index, query, expected, 'or')
          
-    def _search_and(self, query, expected):
-        return self._search(query, expected, 'and')
+    def _search_and(self, index, query, expected):
+        return self._search(index, query, expected, 'and')
 
-    def _apply(self, query, expected, operator='and'):
-        result = self.index.apply(query)
+    def _apply(self, index, query, expected, operator='and'):
+        result = index.apply(query)
         self.assertEqual(result.keys(), expected)
 
-    def _apply_or(self, query, expected):
-        result = self.index.apply({'query': query, 'operator': 'or'})
+    def _apply_or(self, index, query, expected):
+        result = index.apply({'query': query, 'operator': 'or'})
         self.assertEqual(result.keys(), expected)
          
-    def _apply_and(self, query, expected):
-        result = self.index.apply({'query': query, 'operator': 'and'})
+    def _apply_and(self, index, query, expected):
+        result = index.apply({'query': query, 'operator': 'and'})
         self.assertEqual(result.keys(), expected)
 
-    def testInterfaces(self):
-        for iface in implementedBy(PythonFilteredSet):
-            verifyClass(iface, PythonFilteredSet)
+    def test_class_conforms_to_IInjection(self):
+        from zope.interface.verify import verifyClass
+        from zope.index.interfaces import IInjection
+        verifyClass(IInjection, self._getTargetClass())
 
-        for iface in implementedBy(TopicIndex):
-            verifyClass(iface, TopicIndex)
+    def test_instance_conforms_to_IInjection(self):
+        from zope.interface.verify import verifyObject
+        from zope.index.interfaces import IInjection
+        verifyObject(IInjection, self._makeOne())
 
+    def test_class_conforms_to_IIndexSearch(self):
+        from zope.interface.verify import verifyClass
+        from zope.index.interfaces import IIndexSearch
+        verifyClass(IIndexSearch, self._getTargetClass())
+
+    def test_instance_conforms_to_IIndexSearch(self):
+        from zope.interface.verify import verifyObject
+        from zope.index.interfaces import IIndexSearch
+        verifyObject(IIndexSearch, self._makeOne())
+
+    def test_class_conforms_to_ITopicQuerying(self):
+        from zope.interface.verify import verifyClass
+        from zope.index.topic.interfaces import ITopicQuerying
+        verifyClass(ITopicQuerying, self._getTargetClass())
+
+    def test_instance_conforms_to_ITopicQuerying(self):
+        from zope.interface.verify import verifyObject
+        from zope.index.topic.interfaces import ITopicQuerying
+        verifyObject(ITopicQuerying, self._makeOne())
+
+    def test_ctor_defaults(self):
+        import BTrees
+        index = self._makeOne(family=None)
+        self.failUnless(index.family is BTrees.family32)
+
+    def test_ctor_explicit_family(self):
+        import BTrees
+        index = self._makeOne(family=BTrees.family64)
+        self.failUnless(index.family is BTrees.family64)
+
+    def test_clear_erases_filters(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo')
+        index.addFilter(foo)
+        index.clear()
+        self.assertEqual(list(index._filters), [])
+
+    def test_addFilter(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo')
+        index.addFilter(foo)
+        self.assertEqual(list(index._filters), ['foo'])
+        self.failUnless(index._filters['foo'] is foo)
+
+    def test_addFilter_duplicate_replaces(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo')
+        index.addFilter(foo)
+        foo2 = DummyFilter('foo')
+        index.addFilter(foo2)
+        self.assertEqual(list(index._filters), ['foo'])
+        self.failUnless(index._filters['foo'] is foo2)
+
+    def test_delFilter_nonesuch_raises_KeyError(self):
+        index = self._makeOne()
+        self.assertRaises(KeyError, index.delFilter, 'nonesuch')
+
+    def test_delFilter(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo')
+        index.addFilter(foo)
+        bar = DummyFilter('bar')
+        index.addFilter(bar)
+        index.delFilter('foo')
+        self.assertEqual(list(index._filters), ['bar'])
+        self.failUnless(index._filters['bar'] is bar)
+
+    def test_clearFilters_empty(self):
+        index = self._makeOne()
+        index.clearFilters() # doesn't raise
+
+    def test_clearFilters_non_empty(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo')
+        index.addFilter(foo)
+        bar = DummyFilter('bar')
+        index.addFilter(bar)
+        index.clearFilters()
+        self.failUnless(foo._cleared)
+        self.failUnless(bar._cleared)
+
+    def test_index_doc(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo')
+        index.addFilter(foo)
+        bar = DummyFilter('bar')
+        index.addFilter(bar)
+        obj = object()
+        index.index_doc(1, obj)
+        self.assertEqual(foo._indexed, [(1, obj)])
+        self.assertEqual(bar._indexed, [(1, obj)])
+
+    def test_unindex_doc(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo')
+        index.addFilter(foo)
+        bar = DummyFilter('bar')
+        index.addFilter(bar)
+        index.unindex_doc(1)
+        self.assertEqual(foo._unindexed, [1])
+        self.assertEqual(bar._unindexed, [1])
+
+    def test_search_non_tuple_list_query(self):
+        index = self._makeOne()
+        self.assertRaises(TypeError, index.search, {'nonesuch': 'ugh'})
+
+    def test_search_bad_operator(self):
+        index = self._makeOne()
+        self.assertRaises(TypeError, index.search, ['whatever'], 'maybe')
+
+    def test_search_no_filters_list_query(self):
+        index = self._makeOne()
+        result = index.search(['nonesuch'])
+        self.assertEqual(set(result), set())
+
+    def test_search_no_filters_tuple_query(self):
+        index = self._makeOne()
+        result = index.search(('nonesuch',))
+        self.assertEqual(set(result), set())
+
+    def test_search_no_filters_string_query(self):
+        index = self._makeOne()
+        result = index.search('nonesuch')
+        self.assertEqual(set(result), set())
+
+    def test_search_query_matches_one_filter(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo', [1, 2, 3], self._get_family())
+        index.addFilter(foo)
+        bar = DummyFilter('bar', [2, 3, 4], self._get_family())
+        index.addFilter(bar)
+        result = index.search(['foo'])
+        self.assertEqual(set(result), set([1, 2, 3]))
+
+    def test_search_query_matches_multiple_implicit_operator(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo', [1, 2, 3], self._get_family())
+        index.addFilter(foo)
+        bar = DummyFilter('bar', [2, 3, 4], self._get_family())
+        index.addFilter(bar)
+        result = index.search(['foo', 'bar'])
+        self.assertEqual(set(result), set([2, 3]))
+
+    def test_search_query_matches_multiple_implicit_op_no_intersect(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo', [1, 2, 3], self._get_family())
+        index.addFilter(foo)
+        bar = DummyFilter('bar', [4, 5, 6], self._get_family())
+        index.addFilter(bar)
+        result = index.search(['foo', 'bar'])
+        self.assertEqual(set(result), set())
+
+    def test_search_query_matches_multiple_explicit_and(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo', [1, 2, 3], self._get_family())
+        index.addFilter(foo)
+        bar = DummyFilter('bar', [2, 3, 4], self._get_family())
+        index.addFilter(bar)
+        result = index.search(['foo', 'bar'], operator='and')
+        self.assertEqual(set(result), set([2, 3]))
+
+    def test_search_query_matches_multiple_explicit_or(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo', [1, 2, 3], self._get_family())
+        index.addFilter(foo)
+        bar = DummyFilter('bar', [2, 3, 4], self._get_family())
+        index.addFilter(bar)
+        result = index.search(['foo', 'bar'], operator='or')
+        self.assertEqual(set(result), set([1, 2, 3, 4]))
+
+    def test_apply_query_matches_multiple_non_dict_query(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo', [1, 2, 3], self._get_family())
+        index.addFilter(foo)
+        bar = DummyFilter('bar', [2, 3, 4], self._get_family())
+        index.addFilter(bar)
+        result = index.apply(['foo', 'bar'])
+        self.assertEqual(set(result), set([2, 3]))
+
+    def test_apply_query_matches_multiple_implicit_op(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo', [1, 2, 3], self._get_family())
+        index.addFilter(foo)
+        bar = DummyFilter('bar', [2, 3, 4], self._get_family())
+        index.addFilter(bar)
+        result = index.apply({'query': ['foo', 'bar']})
+        self.assertEqual(set(result), set([2, 3]))
+
+    def test_apply_query_matches_multiple_explicit_and(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo', [1, 2, 3], self._get_family())
+        index.addFilter(foo)
+        bar = DummyFilter('bar', [2, 3, 4], self._get_family())
+        index.addFilter(bar)
+        result = index.apply({'query': ['foo', 'bar'], 'operator': 'and'})
+        self.assertEqual(set(result), set([2, 3]))
+
+    def test_apply_query_matches_multiple_explicit_or(self):
+        index = self._makeOne()
+        foo = DummyFilter('foo', [1, 2, 3], self._get_family())
+        index.addFilter(foo)
+        bar = DummyFilter('bar', [2, 3, 4], self._get_family())
+        index.addFilter(bar)
+        result = index.apply({'query': ['foo', 'bar'], 'operator': 'or'})
+        self.assertEqual(set(result), set([1, 2, 3, 4]))
+
+
+class _NotYet:
+
+    def _addFilters(self, index):
+        index.addFilter(
+            PythonFilteredSet('doc1', "context.meta_type == 'doc1'",
+                              index.family))
+        index.addFilter(
+            PythonFilteredSet('doc2', "context.meta_type == 'doc2'",
+                              index.family))
+        index.addFilter(
+            PythonFilteredSet('doc3', "context.meta_type == 'doc3'",
+                              index.family))
+
+    def _populate(self, index):
+        index.index_doc(0 , O('doc0'))
+        index.index_doc(1 , O('doc1'))
+        index.index_doc(2 , O('doc1'))
+        index.index_doc(3 , O('doc2'))
+        index.index_doc(4 , O('doc2'))
+        index.index_doc(5 , O('doc3'))
+        index.index_doc(6 , O('doc3'))
+
     def test_unindex(self):
-        self.index.unindex_doc(-99)         # should not raise 
-        self.index.unindex_doc(3)  
-        self.index.unindex_doc(4)  
-        self.index.unindex_doc(5)  
-        self._search_or('doc1',  [1,2])
-        self._search_or('doc2',  [])
-        self._search_or('doc3',  [6])
-        self._search_or('doc4',  [])
+        index = self._makeOne()
+        index.unindex_doc(-99)         # should not raise 
+        index.unindex_doc(3)  
+        index.unindex_doc(4)  
+        index.unindex_doc(5)  
+        self._search_or(index, 'doc1',  [1,2])
+        self._search_or(index, 'doc2',  [])
+        self._search_or(index, 'doc3',  [6])
+        self._search_or(index, 'doc4',  [])
 
     def test_or(self):
-        self._search_or('doc1',  [1,2])
-        self._search_or(['doc1'],[1,2])
-        self._search_or('doc2',  [3,4]),
-        self._search_or(['doc2'],[3,4])
-        self._search_or(['doc1','doc2'], [1,2,3,4])
+        index = self._makeOne()
+        self._search_or(index, 'doc1',  [1,2])
+        self._search_or(index, ['doc1'],[1,2])
+        self._search_or(index, 'doc2',  [3,4]),
+        self._search_or(index, ['doc2'],[3,4])
+        self._search_or(index, ['doc1','doc2'], [1,2,3,4])
 
     def test_and(self):
-        self._search_and('doc1',   [1,2])
-        self._search_and(['doc1'], [1,2])
-        self._search_and('doc2',   [3,4])
-        self._search_and(['doc2'], [3,4])
-        self._search_and(['doc1','doc2'], [])
+        index = self._makeOne()
+        self._search_and(index, 'doc1',   [1,2])
+        self._search_and(index, ['doc1'], [1,2])
+        self._search_and(index, 'doc2',   [3,4])
+        self._search_and(index, ['doc2'], [3,4])
+        self._search_and(index, ['doc1','doc2'], [])
 
     def test_apply_or(self):
-        self._apply_or('doc1',  [1,2])
-        self._apply_or(['doc1'],[1,2])
-        self._apply_or('doc2',  [3,4]),
-        self._apply_or(['doc2'],[3,4])
-        self._apply_or(['doc1','doc2'], [1,2,3,4])
+        index = self._makeOne()
+        self._apply_or(index, 'doc1',  [1,2])
+        self._apply_or(index, ['doc1'],[1,2])
+        self._apply_or(index, 'doc2',  [3,4]),
+        self._apply_or(index, ['doc2'],[3,4])
+        self._apply_or(index, ['doc1','doc2'], [1,2,3,4])
 
     def test_apply_and(self):
-        self._apply_and('doc1',   [1,2])
-        self._apply_and(['doc1'], [1,2])
-        self._apply_and('doc2',   [3,4])
-        self._apply_and(['doc2'], [3,4])
-        self._apply_and(['doc1','doc2'], [])
+        index = self._makeOne()
+        self._apply_and(index, 'doc1',   [1,2])
+        self._apply_and(index, ['doc1'], [1,2])
+        self._apply_and(index, 'doc2',   [3,4])
+        self._apply_and(index, ['doc2'], [3,4])
+        self._apply_and(index, ['doc1','doc2'], [])
 
     def test_apply(self):
-        self._apply('doc1',   [1,2])
-        self._apply(['doc1'], [1,2])
-        self._apply('doc2',   [3,4])
-        self._apply(['doc2'], [3,4])
-        self._apply(['doc1','doc2'], [])
+        index = self._makeOne()
+        self._apply(index, 'doc1',   [1,2])
+        self._apply(index, ['doc1'], [1,2])
+        self._apply(index, 'doc2',   [3,4])
+        self._apply(index, ['doc2'], [3,4])
+        self._apply(index, ['doc1','doc2'], [])
 
 class TopicIndexTest64(TopicIndexTest):
 
-    family = BTrees.family64
+    def _get_family(self):
+        import BTrees
+        return BTrees.family64
 
 
+class DummyFilter:
+
+    _cleared = False
+
+    def __init__(self, id, ids=(), family=None):
+        self._id = id
+        self._indexed = []
+        self._unindexed = []
+        self._family = family
+        self._ids = ids
+
+    def getId(self):
+        return self._id
+
+    def clear(self):
+        self._cleared = True
+
+    def index_doc(self, docid, obj):
+        self._indexed.append((docid, obj))
+
+    def unindex_doc(self, docid):
+        self._unindexed.append(docid)
+
+    def getIds(self):
+        if self._family is not None:
+            return self._family.IF.TreeSet(self._ids)
+        return set(self._ids)
+
 def test_suite():
-    return TestSuite((makeSuite(TopicIndexTest),
-                      makeSuite(TopicIndexTest64),
-                      ))
-
-if __name__=='__main__':
-    main(defaultTest='test_suite')
+    return unittest.TestSuite((
+        unittest.makeSuite(TopicIndexTest),
+        unittest.makeSuite(TopicIndexTest64),
+    ))



More information about the Checkins mailing list