[Zope-CVS] CVS: Products/Ape/lib/apelib/zodb3 - scanner.py:1.1.2.1 connection.py:1.5.4.1 db.py:1.3.4.1 storage.py:1.6.4.1

Shane Hathaway shane@zope.com
Wed, 23 Jul 2003 00:12:59 -0400


Update of /cvs-repository/Products/Ape/lib/apelib/zodb3
In directory cvs.zope.org:/tmp/cvs-serv21220/lib/apelib/zodb3

Modified Files:
      Tag: ape-scan-branch
	connection.py db.py storage.py 
Added Files:
      Tag: ape-scan-branch
	scanner.py 
Log Message:
Rough implementation of cache freshness scanning.

This will hopefully enable smoother filesystem storage.


=== Added File Products/Ape/lib/apelib/zodb3/scanner.py ===
##############################################################################
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Cache scanner.

Keeps a cache up to date by scanning for changes.

$Id: scanner.py,v 1.1.2.1 2003/07/23 04:12:52 shane Exp $
"""

from thread import allocate_lock
from time import time

from BTrees.OOBTree import OOBTree, OOSet, difference
from BTrees.IOBTree import IOBTree

# FUTURE_TIMEOUT defines how long to keep source information regarding
# OIDs that might be used soon.
FUTURE_TIMEOUT = 10 * 60

CONNECTION_UPDATE_INTERVAL = 15
SCAN_INTERVAL = 30


class ScanControl:

    def __init__(self, db=None):
        self.db = db
        self.next_conn_id = 1
        self.conn_oids = IOBTree()   # IOBTree({ conn_id -> OOSet([oid]) } })
        self.oids = OOSet()          # OOSet([oid])
        self.scanner = Scanner()
        self.lock = allocate_lock()
        self.next_scan = time() + SCAN_INTERVAL

    def newConnection(self):
        self.lock.acquire()
        try:
            conn_id = self.next_conn_id
            self.next_conn_id = conn_id + 1
            return ConnectionScanControl(self, conn_id)
        finally:
            self.lock.release()

    def setConnectionOIDs(self, conn_id, oids):
        changed = 0
        new_oids = OOSet()
        self.lock.acquire()
        try:
            if oids:
                self.conn_oids[conn_id] = OOSet(oids)
            else:
                if self.conn_oids.has_key(conn_id):
                    del self.conn_oids[conn_id]
            for set in self.conn_oids.values():
                new_oids.update(set)
            if self.oids != new_oids:
                self.oids = new_oids
                changed = 1
        finally:
            self.lock.release()
        if changed:
            self.scanner.setOIDs(new_oids)
            print 'pre-scanning'
            self.scanner.scan(new_only=1)
            print 'pre-scan done'
        self.mayScan()

    def mayScan(self):
        now = time()
        if now >= self.next_scan:
            self.next_scan = now + SCAN_INTERVAL
            print 'Scanning %d objects' % len(self.oids)
            inv = self.scanner.scan()
            print 'Finished scanning'
            if inv:
                print 'Invalidating', inv
                d = {}
                for oid in inv:
                    d[oid] = 1
                if self.db is not None:
                    self.db.invalidate(d)
                else:
                    print 'No database set!'


class ConnectionScanControl:

    def __init__(self, ctl, conn_id):
        self.ctl = ctl
        self.conn_id = conn_id
        self.next_update = 0

    def ready(self):
        now = time()
        if now >= self.next_update:
            self.next_update = now + CONNECTION_UPDATE_INTERVAL
            return 1
        return 0

    def setOIDs(self, oids):
        self.ctl.setConnectionOIDs(self.conn_id, oids)


class Scanner:

    def __init__(self):
        self.current = OOBTree()  # OOBTree({ oid -> { source -> status } })
        self.future = {}          # { oid -> ([source], atime) }
        self.lock = allocate_lock()


    def setOIDs(self, oids):
        self.lock.acquire()
        try:
            removed = difference(self.current, oids)
            for oid in removed.keys():
                del self.current[oid]
            added = difference(oids, self.current)
            for oid in added.keys():
                d = {}
                info = self.future.get(oid)
                if info:
                    # Source info for this OID was provided earlier.
                    del self.future[oid]
                    for source in info[0]:
                        d[source] = None
                self.current[oid] = d
        finally:
            self.lock.release()


    def setSources(self, oid, sources):
        self.lock.acquire()
        try:
            if self.current.has_key(oid):
                # This OID is known to be in use.
                d = self.current[oid]
                keys = d.keys()
                keys.sort()
                if keys != sources:
                    for key in keys:
                        if not key in sources:
                            # Remove a source
                            del d[key]
                    for source in sources:
                        if not d.has_key(source):
                            # Add a source with no status yet
                            d[source] = None
            else:
                # This OID might be useful soon.
                self.future[oid] = (sources, time())
        finally:
            self.lock.release()


    def scan(self, new_only=0):
        to_scan = {}        # { repo -> { source -> status } }
        to_invalidate = {}  # { oid -> 1 }
        self.lock.acquire()
        try:
            for oid, statdict in self.current.items():
                for source, status in statdict.items():
                    if new_only and status is not None:
                        continue
                    repo, location = source
                    to_scan.setdefault(repo, {})[source] = status
        finally:
            self.lock.release()
        changes = {}
        for repo, d in to_scan.items():
            c = repo.freshen(d)
            if c:
                changes.update(c)
        if changes:
            # Something changed.  Map the changes back to oids and
            # update self.current.
            self.lock.acquire()
            try:
                for oid, statdict in self.current.items():
                    for source, status in statdict.items():
                        if changes.has_key(source):
                            if statdict.get(source) is not None:
                                to_invalidate[oid] = 1
                            statdict[source] = changes[source]
            finally:
                self.lock.release()
        self.pruneFuture()
        return to_invalidate.keys()


    def pruneFuture(self):
        if self.future:
            self.lock.acquire()
            try:
                # OIDs older than some timeout will probably never be loaded.
                cutoff = time() - FUTURE_TIMEOUT
                for oid, (sources, atime) in self.future.items():
                    if atime < cutoff:
                        del self.future[oid]
            finally:
                self.lock.release()



=== Products/Ape/lib/apelib/zodb3/connection.py 1.5 => 1.5.4.1 ===
--- Products/Ape/lib/apelib/zodb3/connection.py:1.5	Mon May 26 16:20:09 2003
+++ Products/Ape/lib/apelib/zodb3/connection.py	Wed Jul 23 00:12:52 2003
@@ -47,9 +47,24 @@
     tabular records.
     """
     _osio = None
+    _scan_ctl = None
 
     __implements__ = (IKeyedObjectSystem,
                       getattr(Connection, '__implements__', ()))
+
+
+    def _setDB(self, odb):
+        Connection._setDB(self, odb)
+        if odb._scan_ctl is not None:
+            ctl = self._scan_ctl
+            if ctl is None:
+                self._scan_ctl = ctl = odb._scan_ctl.newConnection()
+            if ctl.ready():
+                ctl.setOIDs(self._cache.cache_data.keys())
+                # If there were any invalidations, process them now.
+                if self._invalidated:
+                    self._flush_invalidations()
+
 
     def getObjectSystemIO(self):
         osio = self._osio


=== Products/Ape/lib/apelib/zodb3/db.py 1.3 => 1.3.4.1 ===
--- Products/Ape/lib/apelib/zodb3/db.py:1.3	Wed Jun  4 11:44:45 2003
+++ Products/Ape/lib/apelib/zodb3/db.py	Wed Jul 23 00:12:52 2003
@@ -20,6 +20,7 @@
 
 from apelib.core.interfaces import IMapper
 from apelib.core.exceptions import ConfigurationError
+
 from connection import ApeConnection
 from storage import ApeStorage
 from oidencoder import OIDEncoder
@@ -47,11 +48,12 @@
 
     klass = ApeConnection
 
-    # SDH: two extra args.
+    # SDH: some extra args.
     def __init__(self, storage,
                  mapper_resource=None,
                  factory=None,
                  oid_encoder=None,
+                 scan=1,
                  pool_size=7,
                  cache_size=400,
                  cache_deactivate_after=60,
@@ -111,6 +113,13 @@
             assert IOIDEncoder.isImplementedBy(oid_encoder)
         self._oid_encoder = oid_encoder
         self._mapper_resource = mapper_resource
+        if scan:
+            from scanner import ScanControl
+            ctl = ScanControl(self)
+            self._scan_ctl = ctl
+            storage.setScanner(ctl.scanner)
+        else:
+            self._scan_ctl = None
 
         # Pass through methods:
         for m in ('history',


=== Products/Ape/lib/apelib/zodb3/storage.py 1.6 => 1.6.4.1 ===
--- Products/Ape/lib/apelib/zodb3/storage.py:1.6	Wed Jun  4 11:45:21 2003
+++ Products/Ape/lib/apelib/zodb3/storage.py	Wed Jul 23 00:12:52 2003
@@ -60,8 +60,12 @@
         if not name:
             name = 'ApeStorage: ' + ', '.join(names)
         self._ltid = None
+        self._scanner = None
         BaseStorage.BaseStorage.__init__(self, name)
 
+    def setScanner(self, s):
+        self._scanner = s
+
     def __len__(self):
         return 1
 
@@ -96,7 +100,7 @@
         try:
             self._mapper_resource.access(self)  # Update mapper
             keychain = self._oid_encoder.decode(oid)
-            classified_state, hash_value = self._gwio.load(keychain)
+            event, classified_state, hash_value = self._gwio.load(keychain)
             file = StringIO()
             p = Pickler(file)
             p.dump(classified_state)
@@ -104,6 +108,9 @@
             h = self.hash64(hash_value)
             if DEBUG:
                 print 'loaded', `oid`, `h`
+            if self._scanner is not None:
+                sources = event.getSources()
+                self._scanner.setSources(oid, sources)
             return data, h
         finally:
             self._lock_release()
@@ -128,7 +135,7 @@
             if h64 != HASH0:
                 # Overwriting an old object.  Use the hash to verify
                 # that the new data was derived from the old data.
-                old_cs, old_hash = self._gwio.load(keychain, 1)
+                event, old_cs, old_hash = self._gwio.load(keychain, 1)
                 old_h64 = self.hash64(old_hash)
                 if h64 != old_h64:
                     raise POSException.ConflictError(
@@ -140,7 +147,7 @@
                 # NoStateFoundError or a hash of None, otherwise
                 # there's a conflict.
                 try:
-                    cs, old_hash = self._gwio.load(keychain, 1)
+                    event, cs, old_hash = self._gwio.load(keychain, 1)
                 except NoStateFoundError:
                     pass
                 else:
@@ -152,8 +159,11 @@
             file = StringIO(data)
             u = Unpickler(file)
             classified_state = u.load()
-            new_hash = self._gwio.store(keychain, classified_state)
+            event, new_hash = self._gwio.store(keychain, classified_state)
             new_h64 = self.hash64(new_hash)
+            if self._scanner is not None:
+                sources = event.getSources()
+                self._scanner.setSources(oid, sources)
         finally:
             self._lock_release()