[Checkins] SVN: Products.QueueCatalog/trunk/ moved
Andreas Jung
andreas at andreas-jung.com
Tue May 13 06:38:29 EDT 2008
Log message for revision 86692:
moved
Changed:
D Products.QueueCatalog/trunk/DEPENDENCIES.txt
D Products.QueueCatalog/trunk/Processor.py
A Products.QueueCatalog/trunk/Products/QueueCatalog/DEPENDENCIES.txt
A Products.QueueCatalog/trunk/Products/QueueCatalog/Processor.py
A Products.QueueCatalog/trunk/Products/QueueCatalog/QueueCatalog.py
A Products.QueueCatalog/trunk/Products/QueueCatalog/tests/
D Products.QueueCatalog/trunk/QueueCatalog.py
D Products.QueueCatalog/trunk/tests/
-=-
Deleted: Products.QueueCatalog/trunk/DEPENDENCIES.txt
===================================================================
--- Products.QueueCatalog/trunk/DEPENDENCIES.txt 2008-05-13 10:37:50 UTC (rev 86691)
+++ Products.QueueCatalog/trunk/DEPENDENCIES.txt 2008-05-13 10:38:28 UTC (rev 86692)
@@ -1 +0,0 @@
-Zope >= 2.8.0
Deleted: Products.QueueCatalog/trunk/Processor.py
===================================================================
--- Products.QueueCatalog/trunk/Processor.py 2008-05-13 10:37:50 UTC (rev 86691)
+++ Products.QueueCatalog/trunk/Processor.py 2008-05-13 10:38:28 UTC (rev 86692)
@@ -1,61 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2002-2006 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""
-$Id$
-"""
-
-import thread
-import Zope
-from time import sleep
-import sys
-from zLOG import LOG, ERROR, PANIC, INFO
-
-class Processor:
- """Simple thread that processes queued catalog events
- """
-
- def __init__(self, queue_catalog_paths, interval=60):
- self._queue_catalog_paths = queue_catalog_paths
- self._interval = interval
- thread.start_new_thread(self.live, ())
-
- def live(self):
- LOG('QueuedCatalog', INFO, 'Set up to process queue entries')
- while 1:
- sleep(self._interval)
- for queue_catalog_path in self._queue_catalog_paths:
- try:
- application = Zope.app()
- except:
- LOG('QueuedCatalog', PANIC,
- "Couldn't connect to database",
- error=sys.exc_info())
- break # No point in doing any more paths right now
- else:
-
- try:
- queue_catalog = application.unrestrictedTraverse(
- queue_catalog_path)
- queue_catalog.process()
- except:
- LOG('QueuedCatalog', ERROR, 'Queue processing failed',
- error=sys.exc_info())
-
- else:
- LOG('QueuedCatalog', INFO, 'Processed queue')
-
- application._p_jar.close()
-
-__doc__ = Processor.__doc__ + __doc__
-
Copied: Products.QueueCatalog/trunk/Products/QueueCatalog/DEPENDENCIES.txt (from rev 86689, Products.QueueCatalog/trunk/DEPENDENCIES.txt)
===================================================================
--- Products.QueueCatalog/trunk/Products/QueueCatalog/DEPENDENCIES.txt (rev 0)
+++ Products.QueueCatalog/trunk/Products/QueueCatalog/DEPENDENCIES.txt 2008-05-13 10:38:28 UTC (rev 86692)
@@ -0,0 +1 @@
+Zope >= 2.8.0
Copied: Products.QueueCatalog/trunk/Products/QueueCatalog/Processor.py (from rev 86689, Products.QueueCatalog/trunk/Processor.py)
===================================================================
--- Products.QueueCatalog/trunk/Products/QueueCatalog/Processor.py (rev 0)
+++ Products.QueueCatalog/trunk/Products/QueueCatalog/Processor.py 2008-05-13 10:38:28 UTC (rev 86692)
@@ -0,0 +1,61 @@
+##############################################################################
+#
+# Copyright (c) 2002-2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+$Id$
+"""
+
+import thread
+import Zope
+from time import sleep
+import sys
+from zLOG import LOG, ERROR, PANIC, INFO
+
+class Processor:
+ """Simple thread that processes queued catalog events
+ """
+
+ def __init__(self, queue_catalog_paths, interval=60):
+ self._queue_catalog_paths = queue_catalog_paths
+ self._interval = interval
+ thread.start_new_thread(self.live, ())
+
+ def live(self):
+ LOG('QueuedCatalog', INFO, 'Set up to process queue entries')
+ while 1:
+ sleep(self._interval)
+ for queue_catalog_path in self._queue_catalog_paths:
+ try:
+ application = Zope.app()
+ except:
+ LOG('QueuedCatalog', PANIC,
+ "Couldn't connect to database",
+ error=sys.exc_info())
+ break # No point in doing any more paths right now
+ else:
+
+ try:
+ queue_catalog = application.unrestrictedTraverse(
+ queue_catalog_path)
+ queue_catalog.process()
+ except:
+ LOG('QueuedCatalog', ERROR, 'Queue processing failed',
+ error=sys.exc_info())
+
+ else:
+ LOG('QueuedCatalog', INFO, 'Processed queue')
+
+ application._p_jar.close()
+
+__doc__ = Processor.__doc__ + __doc__
+
Copied: Products.QueueCatalog/trunk/Products/QueueCatalog/QueueCatalog.py (from rev 86689, Products.QueueCatalog/trunk/QueueCatalog.py)
===================================================================
--- Products.QueueCatalog/trunk/Products/QueueCatalog/QueueCatalog.py (rev 0)
+++ Products.QueueCatalog/trunk/Products/QueueCatalog/QueueCatalog.py 2008-05-13 10:38:28 UTC (rev 86692)
@@ -0,0 +1,596 @@
+##############################################################################
+#
+# Copyright (c) 2002-2006 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+$Id$
+"""
+# Python std. lib
+import logging
+import sets
+import sys
+from time import time
+from types import StringType
+
+# Other packages
+from ZODB.POSException import ConflictError
+from ZEO.Exceptions import ClientDisconnected
+from zExceptions import Unauthorized
+from ExtensionClass import Base
+from OFS.SimpleItem import SimpleItem
+from AccessControl.SecurityManagement import getSecurityManager
+from AccessControl.SecurityInfo import ClassSecurityInformation
+from AccessControl.Permissions \
+ import manage_zcatalog_entries, view_management_screens
+from OFS.SimpleItem import SimpleItem
+from BTrees.OOBTree import OOBTree
+from Products.PageTemplates.PageTemplateFile import PageTemplateFile
+from Globals import DTMLFile
+from Acquisition import Implicit, aq_base, aq_inner, aq_parent
+
+# Local
+from CatalogEventQueue import CatalogEventQueue, EVENT_TYPES, ADDED_EVENTS
+from CatalogEventQueue import ADDED, CHANGED, CHANGED_ADDED, REMOVED
+from CatalogEventQueue import SAFE_POLICY, ALTERNATIVE_POLICY
+
+logger = logging.getLogger('event.QueueCatalog')
+
+_zcatalog_methods = {
+ 'catalog_object': 1,
+ 'uncatalog_object': 1,
+ 'uniqueValuesFor': 1,
+ 'getpath': 1,
+ 'getrid': 1,
+ 'getobject': 1,
+ 'schema': 1,
+ 'indexes': 1,
+ 'index_objects': 1,
+ 'searchResults': 1,
+ '__call__': 1,
+ 'refreshCatalog': 1,
+ 'Indexes': 1,
+ 'unrestrictedSearchResults': 1,
+ 'manage_addIndex': 1,
+ 'manage_addColumn': 1,
+ 'manage_catalogClear': 1,
+ 'getIndexObjects': 1,
+ }
+
+_is_zcatalog_method = _zcatalog_methods.has_key
+
+_views = {}
+
+
+class QueueConfigurationError(Exception):
+ pass
+
+
+class QueueCatalog(Implicit, SimpleItem):
+ """Queued ZCatalog (Proxy)
+
+ A QueueCatalog delegates most requests to a ZCatalog that is named
+ as part of the QueueCatalog configuration.
+
+ Requests to catalog or uncatalog objects are queued. They must be
+ processed by a separate process (or thread). The queuing provides
+ benefits:
+
+ - Content-management operations, performed by humans, complete
+ much faster, this making the content-management system more
+ effiecient for it's users.
+
+ - Catalog updates are batched, which makes indexing much more
+ efficient.
+
+ - Indexing is performed by a single thread, allowing more
+ effecient catalog document generation and avoiding conflict
+ errors from occuring during indexing.
+
+ - When used with ZEO, indexing might e performed on the same
+ machine as the storage server, making updates faster.
+
+ """
+
+ security = ClassSecurityInformation()
+
+ _immediate_indexes = () # The names of indexes to update immediately
+ _location = None
+ _immediate_removal = 1 # Flag: don't queue removal
+ _immediate_metadata_update = 1 # Flag: don't queue metadata creation
+ _process_all_indexes = 0 # Flag: queue-process all, not just non-immediate,
+ # indexes
+ title = ''
+
+
+ # When set, _v_catalog_cache is a tuple containing the wrapped ZCatalog
+ # and the REQUEST it is bound to.
+ _v_catalog_cache = None
+
+ # As an alternative to the original queue conflict handling there is now
+ # a policy which will reduce conflicts, but at the cost of possibly having
+ # situations where items get cataloged unnecessarily. YMMV.
+ _conflict_policy = SAFE_POLICY
+
+ def __init__(self, buckets=1009, conflict_policy=SAFE_POLICY):
+ self._buckets = buckets
+ self._conflict_policy = conflict_policy
+ self._clearQueues()
+
+ def _clearQueues(self):
+ self._queues = [ CatalogEventQueue(self.getConflictPolicy())
+ for i in range(self._buckets) ]
+
+ def getTitle(self):
+ return self.title
+
+ security.declareProtected(view_management_screens, 'setLocation')
+ def setLocation(self, location):
+ if self._location is not None:
+ try:
+ self.process()
+ except QueueConfigurationError:
+ self._clearQueues()
+ self._location = location
+
+ security.declareProtected(view_management_screens, 'getIndexInfo')
+ def getIndexInfo(self):
+ try:
+ c = self.getZCatalog()
+ except QueueConfigurationError:
+ return None
+ else:
+ items = [(ob.id, ob.meta_type) for ob in c.getIndexObjects()]
+ items.sort()
+ res = []
+ for id, meta_type in items:
+ res.append({'id': id, 'meta_type': meta_type})
+ return res
+
+
+ security.declareProtected(view_management_screens, 'getImmediateIndexes')
+ def getImmediateIndexes(self):
+ return self._immediate_indexes
+
+ security.declareProtected(view_management_screens, 'setImmediateIndexes')
+ def setImmediateIndexes(self, indexes):
+ self._immediate_indexes = tuple(map(str, indexes))
+
+ security.declareProtected(view_management_screens, 'getImmediateRemoval')
+ def getImmediateRemoval(self):
+ return self._immediate_removal
+
+ security.declareProtected(view_management_screens, 'setImmediateRemoval')
+ def setImmediateRemoval(self, flag):
+ self._immediate_removal = bool(flag)
+
+ security.declareProtected(view_management_screens,
+ 'getImmediateMetadataUpdate')
+ def getImmediateMetadataUpdate(self):
+ return self._immediate_metadata_update
+
+ security.declareProtected(view_management_screens,
+ 'setImmediateMetadataUpdate')
+ def setImmediateMetadataUpdate(self, flag):
+ self._immediate_metadata_update = bool(flag)
+
+ security.declareProtected(view_management_screens, 'getProcessAllIndexes')
+ def getProcessAllIndexes(self):
+ return self._process_all_indexes
+
+ security.declareProtected(view_management_screens, 'setProcessAllIndexes')
+ def setProcessAllIndexes(self, flag):
+ self._process_all_indexes = bool(flag)
+
+ security.declareProtected(view_management_screens, 'getBucketCount')
+ def getBucketCount(self):
+ return self._buckets
+
+ security.declareProtected(view_management_screens, 'setBucketCount')
+ def setBucketCount(self, count):
+ if self._location:
+ self.process()
+ self._buckets = int(count)
+ self._clearQueues()
+
+ security.declareProtected(view_management_screens, 'getConflictPolicy')
+ def getConflictPolicy(self):
+ """ Return the currently-used conflict policy
+ """
+ return self._conflict_policy
+
+ security.declareProtected(view_management_screens, 'setConflictPolicy')
+ def setConflictPolicy(self, policy=SAFE_POLICY):
+ """ Set the conflic policy to be used
+ """
+ try:
+ policy = int(policy)
+ except ValueError:
+ return
+
+ if ( policy in (SAFE_POLICY, ALTERNATIVE_POLICY) and
+ policy != self.getConflictPolicy() ):
+ self._conflict_policy = policy
+ self._clearQueues()
+
+ security.declareProtected(manage_zcatalog_entries, 'getZCatalog')
+ def getZCatalog(self, method=''):
+ ZC = None
+ REQUEST = getattr(self, 'REQUEST', None)
+ cache = self._v_catalog_cache
+ if cache is not None:
+ # The cached catalog may be wrapped with an earlier
+ # request. Before using it, check the request.
+ (ZC, req) = cache
+ if req is not REQUEST:
+ # It's an old wrapper. Discard.
+ ZC = None
+
+ if ZC is None:
+ if self._location is None:
+ raise QueueConfigurationError(
+ "This QueueCatalog hasn't been "
+ "configured with a ZCatalog location."
+ )
+ parent = aq_parent(aq_inner(self))
+ try:
+ ZC = parent.unrestrictedTraverse(self._location)
+ except (KeyError, AttributeError):
+ raise QueueConfigurationError(
+ "ZCatalog not found at %s." % self._location
+ )
+ if not hasattr(ZC, 'getIndexObjects'): # XXX need a better check
+ raise QueueConfigurationError(
+ "The object at %s does not implement the "
+ "IZCatalog interface." % self._location
+ )
+
+ security_manager = getSecurityManager()
+ if not security_manager.validate(self, self, self._location, ZC):
+ raise Unauthorized(self._location, ZC)
+
+ ZC = aq_base(ZC).__of__(parent)
+ self._v_catalog_cache = (ZC, REQUEST)
+
+ if method:
+ if not _is_zcatalog_method(method):
+ raise AttributeError(method)
+ m = getattr(ZC, method)
+ # Note that permission to access the method may be checked
+ # later on. This isn't the right place to check permission.
+ return m
+ else:
+ return ZC
+
+ def __getattr__(self, name):
+ # The original object must be wrapped, but self isn't, so
+ # we return a special object that will do the attribute access
+ # on a wrapped object.
+ if _is_zcatalog_method(name):
+ return AttrWrapper(name)
+
+ raise AttributeError(name)
+
+ def _update(self, uid, etype):
+ t = time()
+ self._queues[hash(uid) % self._buckets].update(uid, etype)
+
+ security.declareProtected(manage_zcatalog_entries, 'catalog_object')
+ def catalog_object(self, obj, uid=None, idxs=None, update_metadata=1):
+ # update_metadata=0 is ignored if the queued catalog is set to
+ # update metadata during queue processing, rather than immediately
+
+ # similarly, limiting the idxs only limits the immediate indexes. If
+ # any work needs to be done in the queue processing, it will all be
+ # done: we have not implemented partial indexing during queue
+ # processing. The only way to avoid any of it is to avoid all of it
+ # (i.e., update metadata immediately and don't have any indexes to
+ # update on the queued side).
+
+ # Make sure the current context is allowed to do this:
+ catalog_object = self.getZCatalog('catalog_object')
+
+ if uid is None:
+ uid = '/'.join(obj.getPhysicalPath())
+ elif not isinstance(uid, StringType):
+ uid = '/'.join(uid)
+
+ catalog = self.getZCatalog()
+ cat_indexes = sets.Set(catalog.indexes())
+ immediate_indexes = sets.Set(self._immediate_indexes)
+ cat_indexes -= immediate_indexes
+
+ # The ZCatalog API doesn't allow us to distinguish between
+ # adds and updates, so we have to try to figure this out
+ # ourselves.
+
+ # There's a risk of a race here. What if there is a previously
+ # unprocessed add event? If so, then this should be a changed
+ # event. If we undo this transaction later, we'll generate a
+ # remove event, when we should generate an add changed event.
+ # To avoid this, we need to make sure we see consistent values
+ # of the event queue. We also need to avoid resolving
+ # (non-undo) conflicts of add events. This will slow things
+ # down a bit, but adds should be relatively infrequent.
+
+ # Now, try to decide if the catalog has the uid (path).
+ already_cataloged = cataloged(catalog, uid)
+ if not already_cataloged:
+ # Looks like we should add, but maybe there's already a
+ # pending add event. We'd better check the event queue:
+ already_cataloged = (
+ self._queues[hash(uid) % self._buckets].getEvent(uid) in
+ ADDED_EVENTS)
+
+ if idxs and already_cataloged:
+ # if not already_cataloged, we index the whole thing
+ idxs = sets.Set(idxs)
+ immediate_indexes.intersection_update(idxs)
+ cat_indexes.intersection_update(idxs)
+
+ immediate_metadata = self.getImmediateMetadataUpdate()
+ if cat_indexes or update_metadata and not immediate_metadata:
+ self._update(uid, already_cataloged and CHANGED or ADDED)
+
+ if immediate_indexes:
+ # Update some of the indexes immediately.
+ catalog.catalog_object(
+ obj, uid, immediate_indexes,
+ update_metadata=update_metadata and immediate_metadata)
+ elif update_metadata and immediate_metadata:
+ # if it is added, no point in doing the metadata, and it will be
+ # done in the queue process anyway
+ catalog._catalog.updateMetadata(obj, uid)
+
+ security.declareProtected(manage_zcatalog_entries, 'uncatalog_object')
+ def uncatalog_object(self, uid):
+ if not isinstance(uid, StringType):
+ uid = '/'.join(uid)
+
+ self._update(uid, REMOVED)
+
+ if self._immediate_removal:
+ self._process_queue( self._queues[hash(uid) % self._buckets]
+ , limit=None
+ )
+
+ security.declareProtected(manage_zcatalog_entries, 'process')
+ def process(self, max=None):
+ """ Process pending events and return number of events processed. """
+ if not self.manage_size():
+ return 0
+
+ count = 0
+ for queue in filter(None, self._queues):
+ limit = None
+ if max:
+ # limit the number of events
+ limit = max - count
+
+ count += self._process_queue(queue, limit)
+
+ if max and count >= max:
+ # On reaching the maximum, return immediately
+ # so the caller can commit the transaction,
+ # sleep for a while, or do something else.
+ break
+
+ return count
+
+ def _process_queue(self, queue, limit):
+ """Process a single queue"""
+ catalog = self.getZCatalog()
+
+ if self.getProcessAllIndexes():
+ idxs = None
+ else:
+ cat_indexes = sets.Set(catalog.indexes())
+ immediate_indexes = sets.Set(self._immediate_indexes)
+ if not immediate_indexes or immediate_indexes==cat_indexes:
+ idxs = None # do all of 'em
+ else:
+ idxs = list(cat_indexes - immediate_indexes)
+ events = queue.process(limit)
+ count = 0
+
+ for uid, (t, event) in events.items():
+ if event is REMOVED:
+ try:
+ if cataloged(catalog, uid):
+ catalog.uncatalog_object(uid)
+ except (ConflictError, ClientDisconnected):
+ raise
+ except:
+ logger.error('error uncataloging object', exc_info=True)
+ else:
+ # add or change
+ if event is CHANGED and not cataloged(catalog, uid):
+ continue
+ # Note that the uid may be relative to the catalog.
+ obj = catalog.unrestrictedTraverse(uid, None)
+ if obj is not None:
+ immediate_metadata = self.getImmediateMetadataUpdate()
+ try:
+ catalog.catalog_object(
+ obj, uid, idxs=idxs,
+ update_metadata=not immediate_metadata)
+ except (ConflictError, ClientDisconnected):
+ raise
+ except:
+ logger.error('error cataloging object', exc_info=True)
+
+ count = count + 1
+
+ return count
+
+
+ #
+ # CMF catalog tool methods.
+ #
+ security.declarePrivate('indexObject')
+ def indexObject(self, object):
+ """Add to catalog.
+ """
+ self.catalog_object(object, self.uidForObject(object))
+
+ security.declarePrivate('unindexObject')
+ def unindexObject(self, object):
+ """Remove from catalog.
+ """
+ self.uncatalog_object(self.uidForObject(object))
+
+ security.declarePrivate('reindexObject')
+ def reindexObject(self, object, idxs=None,update_metadata=1,uid=None):
+ """Update catalog after object data has changed.
+
+ The optional idxs argument is a list of specific indexes
+ to update (all of them by default).
+ """
+ self.catalog_object(object, uid or self.uidForObject(object), idxs=idxs,
+ update_metadata=update_metadata)
+
+ security.declarePrivate('uidForObject')
+ def uidForObject(self, obj):
+ """Get a catalog uid for the object. Allows the underlying catalog
+ to determine the uids if it implements this method"""
+ catalog = self.getZCatalog()
+ if hasattr(aq_base(catalog), 'uidForObject'):
+ return catalog.uidForObject(obj)
+ return '/'.join(obj.getPhysicalPath())
+
+ # Provide web pages. It would be nice to use views, but Zope 2.6
+ # just isn't ready for views. :( In particular, we'd have to fake
+ # out the PageTemplateFiles in some brittle way to make them do
+ # the right thing. :(
+
+ security.declareProtected(view_management_screens, 'manage_editForm')
+ manage_editForm = PageTemplateFile('www/edit', globals())
+
+ security.declareProtected(view_management_screens, 'manage_getLocation')
+ def manage_getLocation(self):
+ return self._location or ''
+
+ security.declareProtected(view_management_screens, 'manage_edit')
+ def manage_edit(self, title='', location='', immediate_indexes=(),
+ immediate_removal=0, bucket_count=0, immediate_metadata=0,
+ all_indexes=0, conflict_policy=SAFE_POLICY, RESPONSE=None):
+ """ Edit the instance """
+ self.title = title
+ self.setLocation(location or None)
+ self.setImmediateIndexes(immediate_indexes)
+ self.setImmediateRemoval(immediate_removal)
+ self.setImmediateMetadataUpdate(immediate_metadata)
+ self.setProcessAllIndexes(all_indexes)
+ self.setConflictPolicy(conflict_policy)
+ if bucket_count:
+ bucket_count = int(bucket_count)
+ if bucket_count != self.getBucketCount():
+ self.setBucketCount(bucket_count)
+
+ if RESPONSE is not None:
+ RESPONSE.redirect('%s/manage_editForm?manage_tabs_message='
+ 'Properties+changed' % self.absolute_url())
+
+
+ security.declareProtected(manage_zcatalog_entries,
+ 'list_queue_items')
+ def list_queue_items(self, limit=100):
+ """Return a list of items in the queue."""
+ items = []
+ count = 0
+ for queue in filter(None, self._queues):
+ qitems = queue._data.keys()
+ count += len(qitems)
+ items += qitems
+ if limit is not None:
+ if count > limit:
+ items = items[:limit]
+ return items
+
+
+ security.declareProtected(manage_zcatalog_entries, 'manage_queue')
+ manage_queue = DTMLFile('dtml/queue', globals())
+
+ security.declareProtected(manage_zcatalog_entries, 'manage_size')
+ def manage_size(self):
+ size = 0
+ for q in self._queues:
+ size += len(q)
+
+ return size
+
+ security.declareProtected(manage_zcatalog_entries, 'manage_process')
+ def manage_process(self, count=100, REQUEST=None):
+ "Web UI to manually process queues"
+ count = int(count)
+ processed = self.process(max=count)
+ if REQUEST is not None:
+ msg = '%i Queue item(s) processed' % processed
+ return self.manage_queue(manage_tabs_message=msg)
+ else:
+ return processed
+
+ # Provide Zope 2 offerings
+
+ index_html = None
+
+ meta_type = 'ZCatalog Queue'
+
+ manage_options=(
+ (
+ {'label': 'Configure', 'action': 'manage_editForm',
+ 'help':('QueueCatalog','QueueCatalog-Configure.stx')},
+
+ {'label': 'Queue', 'action': 'manage_queue',
+ 'help':('QueueCatalog','QueueCatalog-Queue.stx')},
+ )
+ +SimpleItem.manage_options
+ )
+
+ security.declareObjectPublic()
+ # Disallow access to subobjects with no security assertions.
+ security.setDefaultAccess('deny')
+
+ security.declarePublic('getTitle', 'title_or_id')
+
+ security.declareProtected(manage_zcatalog_entries,
+ 'catalog_object', 'uncatalog_object')
+
+
+def cataloged(catalog, path):
+ getrid = getattr(catalog, 'getrid', None)
+ if getrid is None:
+
+ # This is an old catalog that doesn't provide an API for
+ # getting an objects rid (and thus determing that the
+ # object is already cataloged.
+
+ # We'll just use our knowledge of the internal structure.
+
+ rid = catalog._catalog.uids.get(path)
+
+ else:
+ rid = catalog.getrid(path)
+
+ return rid is not None
+
+class AttrWrapper(Base):
+ "Special object that allowes us to use acquisition in QueueCatalog "
+ "attribute access"
+
+ def __init__(self, name):
+ self.__name__ = name
+
+ def __of__(self, wrappedQueueCatalog):
+ return wrappedQueueCatalog.getZCatalog(self.__name__)
+
+__doc__ = QueueCatalog.__doc__ + __doc__
+
Copied: Products.QueueCatalog/trunk/Products/QueueCatalog/tests (from rev 86689, Products.QueueCatalog/trunk/tests)
Deleted: Products.QueueCatalog/trunk/QueueCatalog.py
===================================================================
--- Products.QueueCatalog/trunk/QueueCatalog.py 2008-05-13 10:37:50 UTC (rev 86691)
+++ Products.QueueCatalog/trunk/QueueCatalog.py 2008-05-13 10:38:28 UTC (rev 86692)
@@ -1,596 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2002-2006 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""
-$Id$
-"""
-# Python std. lib
-import logging
-import sets
-import sys
-from time import time
-from types import StringType
-
-# Other packages
-from ZODB.POSException import ConflictError
-from ZEO.Exceptions import ClientDisconnected
-from zExceptions import Unauthorized
-from ExtensionClass import Base
-from OFS.SimpleItem import SimpleItem
-from AccessControl.SecurityManagement import getSecurityManager
-from AccessControl.SecurityInfo import ClassSecurityInformation
-from AccessControl.Permissions \
- import manage_zcatalog_entries, view_management_screens
-from OFS.SimpleItem import SimpleItem
-from BTrees.OOBTree import OOBTree
-from Products.PageTemplates.PageTemplateFile import PageTemplateFile
-from Globals import DTMLFile
-from Acquisition import Implicit, aq_base, aq_inner, aq_parent
-
-# Local
-from CatalogEventQueue import CatalogEventQueue, EVENT_TYPES, ADDED_EVENTS
-from CatalogEventQueue import ADDED, CHANGED, CHANGED_ADDED, REMOVED
-from CatalogEventQueue import SAFE_POLICY, ALTERNATIVE_POLICY
-
-logger = logging.getLogger('event.QueueCatalog')
-
-_zcatalog_methods = {
- 'catalog_object': 1,
- 'uncatalog_object': 1,
- 'uniqueValuesFor': 1,
- 'getpath': 1,
- 'getrid': 1,
- 'getobject': 1,
- 'schema': 1,
- 'indexes': 1,
- 'index_objects': 1,
- 'searchResults': 1,
- '__call__': 1,
- 'refreshCatalog': 1,
- 'Indexes': 1,
- 'unrestrictedSearchResults': 1,
- 'manage_addIndex': 1,
- 'manage_addColumn': 1,
- 'manage_catalogClear': 1,
- 'getIndexObjects': 1,
- }
-
-_is_zcatalog_method = _zcatalog_methods.has_key
-
-_views = {}
-
-
-class QueueConfigurationError(Exception):
- pass
-
-
-class QueueCatalog(Implicit, SimpleItem):
- """Queued ZCatalog (Proxy)
-
- A QueueCatalog delegates most requests to a ZCatalog that is named
- as part of the QueueCatalog configuration.
-
- Requests to catalog or uncatalog objects are queued. They must be
- processed by a separate process (or thread). The queuing provides
- benefits:
-
- - Content-management operations, performed by humans, complete
- much faster, this making the content-management system more
- effiecient for it's users.
-
- - Catalog updates are batched, which makes indexing much more
- efficient.
-
- - Indexing is performed by a single thread, allowing more
- effecient catalog document generation and avoiding conflict
- errors from occuring during indexing.
-
- - When used with ZEO, indexing might e performed on the same
- machine as the storage server, making updates faster.
-
- """
-
- security = ClassSecurityInformation()
-
- _immediate_indexes = () # The names of indexes to update immediately
- _location = None
- _immediate_removal = 1 # Flag: don't queue removal
- _immediate_metadata_update = 1 # Flag: don't queue metadata creation
- _process_all_indexes = 0 # Flag: queue-process all, not just non-immediate,
- # indexes
- title = ''
-
-
- # When set, _v_catalog_cache is a tuple containing the wrapped ZCatalog
- # and the REQUEST it is bound to.
- _v_catalog_cache = None
-
- # As an alternative to the original queue conflict handling there is now
- # a policy which will reduce conflicts, but at the cost of possibly having
- # situations where items get cataloged unnecessarily. YMMV.
- _conflict_policy = SAFE_POLICY
-
- def __init__(self, buckets=1009, conflict_policy=SAFE_POLICY):
- self._buckets = buckets
- self._conflict_policy = conflict_policy
- self._clearQueues()
-
- def _clearQueues(self):
- self._queues = [ CatalogEventQueue(self.getConflictPolicy())
- for i in range(self._buckets) ]
-
- def getTitle(self):
- return self.title
-
- security.declareProtected(view_management_screens, 'setLocation')
- def setLocation(self, location):
- if self._location is not None:
- try:
- self.process()
- except QueueConfigurationError:
- self._clearQueues()
- self._location = location
-
- security.declareProtected(view_management_screens, 'getIndexInfo')
- def getIndexInfo(self):
- try:
- c = self.getZCatalog()
- except QueueConfigurationError:
- return None
- else:
- items = [(ob.id, ob.meta_type) for ob in c.getIndexObjects()]
- items.sort()
- res = []
- for id, meta_type in items:
- res.append({'id': id, 'meta_type': meta_type})
- return res
-
-
- security.declareProtected(view_management_screens, 'getImmediateIndexes')
- def getImmediateIndexes(self):
- return self._immediate_indexes
-
- security.declareProtected(view_management_screens, 'setImmediateIndexes')
- def setImmediateIndexes(self, indexes):
- self._immediate_indexes = tuple(map(str, indexes))
-
- security.declareProtected(view_management_screens, 'getImmediateRemoval')
- def getImmediateRemoval(self):
- return self._immediate_removal
-
- security.declareProtected(view_management_screens, 'setImmediateRemoval')
- def setImmediateRemoval(self, flag):
- self._immediate_removal = bool(flag)
-
- security.declareProtected(view_management_screens,
- 'getImmediateMetadataUpdate')
- def getImmediateMetadataUpdate(self):
- return self._immediate_metadata_update
-
- security.declareProtected(view_management_screens,
- 'setImmediateMetadataUpdate')
- def setImmediateMetadataUpdate(self, flag):
- self._immediate_metadata_update = bool(flag)
-
- security.declareProtected(view_management_screens, 'getProcessAllIndexes')
- def getProcessAllIndexes(self):
- return self._process_all_indexes
-
- security.declareProtected(view_management_screens, 'setProcessAllIndexes')
- def setProcessAllIndexes(self, flag):
- self._process_all_indexes = bool(flag)
-
- security.declareProtected(view_management_screens, 'getBucketCount')
- def getBucketCount(self):
- return self._buckets
-
- security.declareProtected(view_management_screens, 'setBucketCount')
- def setBucketCount(self, count):
- if self._location:
- self.process()
- self._buckets = int(count)
- self._clearQueues()
-
- security.declareProtected(view_management_screens, 'getConflictPolicy')
- def getConflictPolicy(self):
- """ Return the currently-used conflict policy
- """
- return self._conflict_policy
-
- security.declareProtected(view_management_screens, 'setConflictPolicy')
- def setConflictPolicy(self, policy=SAFE_POLICY):
- """ Set the conflic policy to be used
- """
- try:
- policy = int(policy)
- except ValueError:
- return
-
- if ( policy in (SAFE_POLICY, ALTERNATIVE_POLICY) and
- policy != self.getConflictPolicy() ):
- self._conflict_policy = policy
- self._clearQueues()
-
- security.declareProtected(manage_zcatalog_entries, 'getZCatalog')
- def getZCatalog(self, method=''):
- ZC = None
- REQUEST = getattr(self, 'REQUEST', None)
- cache = self._v_catalog_cache
- if cache is not None:
- # The cached catalog may be wrapped with an earlier
- # request. Before using it, check the request.
- (ZC, req) = cache
- if req is not REQUEST:
- # It's an old wrapper. Discard.
- ZC = None
-
- if ZC is None:
- if self._location is None:
- raise QueueConfigurationError(
- "This QueueCatalog hasn't been "
- "configured with a ZCatalog location."
- )
- parent = aq_parent(aq_inner(self))
- try:
- ZC = parent.unrestrictedTraverse(self._location)
- except (KeyError, AttributeError):
- raise QueueConfigurationError(
- "ZCatalog not found at %s." % self._location
- )
- if not hasattr(ZC, 'getIndexObjects'): # XXX need a better check
- raise QueueConfigurationError(
- "The object at %s does not implement the "
- "IZCatalog interface." % self._location
- )
-
- security_manager = getSecurityManager()
- if not security_manager.validate(self, self, self._location, ZC):
- raise Unauthorized(self._location, ZC)
-
- ZC = aq_base(ZC).__of__(parent)
- self._v_catalog_cache = (ZC, REQUEST)
-
- if method:
- if not _is_zcatalog_method(method):
- raise AttributeError(method)
- m = getattr(ZC, method)
- # Note that permission to access the method may be checked
- # later on. This isn't the right place to check permission.
- return m
- else:
- return ZC
-
- def __getattr__(self, name):
- # The original object must be wrapped, but self isn't, so
- # we return a special object that will do the attribute access
- # on a wrapped object.
- if _is_zcatalog_method(name):
- return AttrWrapper(name)
-
- raise AttributeError(name)
-
- def _update(self, uid, etype):
- t = time()
- self._queues[hash(uid) % self._buckets].update(uid, etype)
-
- security.declareProtected(manage_zcatalog_entries, 'catalog_object')
- def catalog_object(self, obj, uid=None, idxs=None, update_metadata=1):
- # update_metadata=0 is ignored if the queued catalog is set to
- # update metadata during queue processing, rather than immediately
-
- # similarly, limiting the idxs only limits the immediate indexes. If
- # any work needs to be done in the queue processing, it will all be
- # done: we have not implemented partial indexing during queue
- # processing. The only way to avoid any of it is to avoid all of it
- # (i.e., update metadata immediately and don't have any indexes to
- # update on the queued side).
-
- # Make sure the current context is allowed to do this:
- catalog_object = self.getZCatalog('catalog_object')
-
- if uid is None:
- uid = '/'.join(obj.getPhysicalPath())
- elif not isinstance(uid, StringType):
- uid = '/'.join(uid)
-
- catalog = self.getZCatalog()
- cat_indexes = sets.Set(catalog.indexes())
- immediate_indexes = sets.Set(self._immediate_indexes)
- cat_indexes -= immediate_indexes
-
- # The ZCatalog API doesn't allow us to distinguish between
- # adds and updates, so we have to try to figure this out
- # ourselves.
-
- # There's a risk of a race here. What if there is a previously
- # unprocessed add event? If so, then this should be a changed
- # event. If we undo this transaction later, we'll generate a
- # remove event, when we should generate an add changed event.
- # To avoid this, we need to make sure we see consistent values
- # of the event queue. We also need to avoid resolving
- # (non-undo) conflicts of add events. This will slow things
- # down a bit, but adds should be relatively infrequent.
-
- # Now, try to decide if the catalog has the uid (path).
- already_cataloged = cataloged(catalog, uid)
- if not already_cataloged:
- # Looks like we should add, but maybe there's already a
- # pending add event. We'd better check the event queue:
- already_cataloged = (
- self._queues[hash(uid) % self._buckets].getEvent(uid) in
- ADDED_EVENTS)
-
- if idxs and already_cataloged:
- # if not already_cataloged, we index the whole thing
- idxs = sets.Set(idxs)
- immediate_indexes.intersection_update(idxs)
- cat_indexes.intersection_update(idxs)
-
- immediate_metadata = self.getImmediateMetadataUpdate()
- if cat_indexes or update_metadata and not immediate_metadata:
- self._update(uid, already_cataloged and CHANGED or ADDED)
-
- if immediate_indexes:
- # Update some of the indexes immediately.
- catalog.catalog_object(
- obj, uid, immediate_indexes,
- update_metadata=update_metadata and immediate_metadata)
- elif update_metadata and immediate_metadata:
- # if it is added, no point in doing the metadata, and it will be
- # done in the queue process anyway
- catalog._catalog.updateMetadata(obj, uid)
-
- security.declareProtected(manage_zcatalog_entries, 'uncatalog_object')
- def uncatalog_object(self, uid):
- if not isinstance(uid, StringType):
- uid = '/'.join(uid)
-
- self._update(uid, REMOVED)
-
- if self._immediate_removal:
- self._process_queue( self._queues[hash(uid) % self._buckets]
- , limit=None
- )
-
- security.declareProtected(manage_zcatalog_entries, 'process')
- def process(self, max=None):
- """ Process pending events and return number of events processed. """
- if not self.manage_size():
- return 0
-
- count = 0
- for queue in filter(None, self._queues):
- limit = None
- if max:
- # limit the number of events
- limit = max - count
-
- count += self._process_queue(queue, limit)
-
- if max and count >= max:
- # On reaching the maximum, return immediately
- # so the caller can commit the transaction,
- # sleep for a while, or do something else.
- break
-
- return count
-
- def _process_queue(self, queue, limit):
- """Process a single queue"""
- catalog = self.getZCatalog()
-
- if self.getProcessAllIndexes():
- idxs = None
- else:
- cat_indexes = sets.Set(catalog.indexes())
- immediate_indexes = sets.Set(self._immediate_indexes)
- if not immediate_indexes or immediate_indexes==cat_indexes:
- idxs = None # do all of 'em
- else:
- idxs = list(cat_indexes - immediate_indexes)
- events = queue.process(limit)
- count = 0
-
- for uid, (t, event) in events.items():
- if event is REMOVED:
- try:
- if cataloged(catalog, uid):
- catalog.uncatalog_object(uid)
- except (ConflictError, ClientDisconnected):
- raise
- except:
- logger.error('error uncataloging object', exc_info=True)
- else:
- # add or change
- if event is CHANGED and not cataloged(catalog, uid):
- continue
- # Note that the uid may be relative to the catalog.
- obj = catalog.unrestrictedTraverse(uid, None)
- if obj is not None:
- immediate_metadata = self.getImmediateMetadataUpdate()
- try:
- catalog.catalog_object(
- obj, uid, idxs=idxs,
- update_metadata=not immediate_metadata)
- except (ConflictError, ClientDisconnected):
- raise
- except:
- logger.error('error cataloging object', exc_info=True)
-
- count = count + 1
-
- return count
-
-
- #
- # CMF catalog tool methods.
- #
- security.declarePrivate('indexObject')
- def indexObject(self, object):
- """Add to catalog.
- """
- self.catalog_object(object, self.uidForObject(object))
-
- security.declarePrivate('unindexObject')
- def unindexObject(self, object):
- """Remove from catalog.
- """
- self.uncatalog_object(self.uidForObject(object))
-
- security.declarePrivate('reindexObject')
- def reindexObject(self, object, idxs=None,update_metadata=1,uid=None):
- """Update catalog after object data has changed.
-
- The optional idxs argument is a list of specific indexes
- to update (all of them by default).
- """
- self.catalog_object(object, uid or self.uidForObject(object), idxs=idxs,
- update_metadata=update_metadata)
-
- security.declarePrivate('uidForObject')
- def uidForObject(self, obj):
- """Get a catalog uid for the object. Allows the underlying catalog
- to determine the uids if it implements this method"""
- catalog = self.getZCatalog()
- if hasattr(aq_base(catalog), 'uidForObject'):
- return catalog.uidForObject(obj)
- return '/'.join(obj.getPhysicalPath())
-
- # Provide web pages. It would be nice to use views, but Zope 2.6
- # just isn't ready for views. :( In particular, we'd have to fake
- # out the PageTemplateFiles in some brittle way to make them do
- # the right thing. :(
-
- security.declareProtected(view_management_screens, 'manage_editForm')
- manage_editForm = PageTemplateFile('www/edit', globals())
-
- security.declareProtected(view_management_screens, 'manage_getLocation')
- def manage_getLocation(self):
- return self._location or ''
-
- security.declareProtected(view_management_screens, 'manage_edit')
- def manage_edit(self, title='', location='', immediate_indexes=(),
- immediate_removal=0, bucket_count=0, immediate_metadata=0,
- all_indexes=0, conflict_policy=SAFE_POLICY, RESPONSE=None):
- """ Edit the instance """
- self.title = title
- self.setLocation(location or None)
- self.setImmediateIndexes(immediate_indexes)
- self.setImmediateRemoval(immediate_removal)
- self.setImmediateMetadataUpdate(immediate_metadata)
- self.setProcessAllIndexes(all_indexes)
- self.setConflictPolicy(conflict_policy)
- if bucket_count:
- bucket_count = int(bucket_count)
- if bucket_count != self.getBucketCount():
- self.setBucketCount(bucket_count)
-
- if RESPONSE is not None:
- RESPONSE.redirect('%s/manage_editForm?manage_tabs_message='
- 'Properties+changed' % self.absolute_url())
-
-
- security.declareProtected(manage_zcatalog_entries,
- 'list_queue_items')
- def list_queue_items(self, limit=100):
- """Return a list of items in the queue."""
- items = []
- count = 0
- for queue in filter(None, self._queues):
- qitems = queue._data.keys()
- count += len(qitems)
- items += qitems
- if limit is not None:
- if count > limit:
- items = items[:limit]
- return items
-
-
- security.declareProtected(manage_zcatalog_entries, 'manage_queue')
- manage_queue = DTMLFile('dtml/queue', globals())
-
- security.declareProtected(manage_zcatalog_entries, 'manage_size')
- def manage_size(self):
- size = 0
- for q in self._queues:
- size += len(q)
-
- return size
-
- security.declareProtected(manage_zcatalog_entries, 'manage_process')
- def manage_process(self, count=100, REQUEST=None):
- "Web UI to manually process queues"
- count = int(count)
- processed = self.process(max=count)
- if REQUEST is not None:
- msg = '%i Queue item(s) processed' % processed
- return self.manage_queue(manage_tabs_message=msg)
- else:
- return processed
-
- # Provide Zope 2 offerings
-
- index_html = None
-
- meta_type = 'ZCatalog Queue'
-
- manage_options=(
- (
- {'label': 'Configure', 'action': 'manage_editForm',
- 'help':('QueueCatalog','QueueCatalog-Configure.stx')},
-
- {'label': 'Queue', 'action': 'manage_queue',
- 'help':('QueueCatalog','QueueCatalog-Queue.stx')},
- )
- +SimpleItem.manage_options
- )
-
- security.declareObjectPublic()
- # Disallow access to subobjects with no security assertions.
- security.setDefaultAccess('deny')
-
- security.declarePublic('getTitle', 'title_or_id')
-
- security.declareProtected(manage_zcatalog_entries,
- 'catalog_object', 'uncatalog_object')
-
-
-def cataloged(catalog, path):
- getrid = getattr(catalog, 'getrid', None)
- if getrid is None:
-
- # This is an old catalog that doesn't provide an API for
- # getting an objects rid (and thus determing that the
- # object is already cataloged.
-
- # We'll just use our knowledge of the internal structure.
-
- rid = catalog._catalog.uids.get(path)
-
- else:
- rid = catalog.getrid(path)
-
- return rid is not None
-
-class AttrWrapper(Base):
- "Special object that allowes us to use acquisition in QueueCatalog "
- "attribute access"
-
- def __init__(self, name):
- self.__name__ = name
-
- def __of__(self, wrappedQueueCatalog):
- return wrappedQueueCatalog.getZCatalog(self.__name__)
-
-__doc__ = QueueCatalog.__doc__ + __doc__
-
More information about the Checkins
mailing list