[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server - IHeaderOutput.py:1.1.2.1 ITask.py:1.1.2.1 HTTPServer.py:1.1.2.11 TaskThreads.py:1.1.2.5

Shane Hathaway shane@digicool.com
Wed, 28 Nov 2001 10:29:12 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/Server
In directory cvs.zope.org:/tmp/cvs-serv9851

Modified Files:
      Tag: Zope-3x-branch
	HTTPServer.py TaskThreads.py 
Added Files:
      Tag: Zope-3x-branch
	IHeaderOutput.py ITask.py 
Log Message:
Added some documentation and a test (and fix) for a potential threading bug.


=== Added File Zope3/lib/python/Zope/Server/IHeaderOutput.py ===
# Copyright (c) 2001 Zope Corporation and Contributors.  All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.


from Interface import Interface


class IHeaderOutput (Interface):
    """
    Zope.Publisher.HTTP.HTTPResponse is optionally passed an
    object which implements this interface in order to intermingle
    its headers with the HTTP server's response headers,
    and for the purpose of better logging.
    """

    def setResponseStatus(status, reason):
        """
        Sets the status code and the accompanying message.
        """

    def setResponseHeaders(mapping):
        """
        Sets headers.  The headers must be Correctly-Cased.
        """

    def appendResponseHeaders(lst):
        """
        Sets headers that can potentially repeat.
        Takes a list of strings.
        """

    def wroteResponseHeader():
        """
        Returns a flag indicating whether the response
        header has already been sent.
        """

    def setAuthUserName(name):
        """
        Sets the name of the authenticated user so it can
        be logged.
        """


=== Added File Zope3/lib/python/Zope/Server/ITask.py ===
# Copyright (c) 2001 Zope Corporation and Contributors.  All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.


from Interface import Interface


class ITask (Interface):
    """
    The interface expected of an object placed in the queue of
    a ThreadedTaskDispatcher.  Provides facilities for executing
    or canceling the task.
    """

    def service():
        """
        Services the task.  Either service() or cancel() is called
        for every task queued.
        """

    def cancel():
        """
        Called instead of service() during shutdown or if an
        exception occurs that prevents the task from being
        serviced.  Must return quickly and should not throw exceptions.
        """

    def defer():
        """
        Called just before the task is queued to be executed in
        a different thread.
        """


=== Zope3/lib/python/Zope/Server/HTTPServer.py 1.1.2.10 => 1.1.2.11 ===
 from dual_mode_channel import OverflowableBuffer
 from Adjustments import default_adj
+from IHeaderOutput import IHeaderOutput
+from ITask import ITask
 
 
 try:
@@ -48,8 +50,13 @@
 
 
 class http_task:
+    """
+    An HTTP task receives a parsed request and an HTTP channel
+    and is expected to write its response to that channel.
+    Subclass this and override the execute() method.
+    """
 
-    # __implements__ = ITask
+    __implements__ = ITask, IHeaderOutput  #, IOutputStream
 
     instream = None
     close_on_finish = 1
@@ -80,6 +87,7 @@
 
     def service(self):
         """
+        Called to execute the task.
         """
         try:
             try:
@@ -234,6 +242,9 @@
 
 
 class http_request_data:
+    """
+    A structure that collects the HTTP request.
+    """
 
     completed = 0  # Set once request is completed.
     empty = 0        # Set if no request was made.
@@ -257,9 +268,11 @@
         """
         Receives the HTTP stream for one request.
         Returns the number of bytes consumed.
+        Sets the completed flag once both the header and the
+        body have been received.
         """
         if self.completed:
-            return 0
+            return 0  # Can't consume any more.
         datalen = len(data)
         br = self.body_rcv
         if br is None:
@@ -294,6 +307,10 @@
 
 
     def parse_header(self, header_plus):
+        """
+        Parses the header_plus block of text (the headers plus the
+        first line of the request).
+        """
         index = header_plus.find('\r\n')
         if index >= 0:
             first_line = header_plus[:index]
@@ -419,14 +436,14 @@
 
     def add_channel(self, map=None):
         """
-        Keeps track of opened HTTP channels.
+        This hook keeps track of opened HTTP channels.
         """
         channel_type.add_channel(self, map)
         self.active_channels[self._fileno] = self
 
     def del_channel(self, map=None):
         """
-        Keeps track of opened HTTP channels.
+        This hook keeps track of closed HTTP channels.
         """
         channel_type.del_channel(self, map)
         ac = self.active_channels
@@ -447,6 +464,10 @@
         self.kill_zombies()
 
     def kill_zombies(self):
+        """
+        Closes connections that have not had any activity in
+        (channel_timeout) seconds.
+        """
         now = time.time()
         cutoff = now - self.adj.channel_timeout
         for channel in self.active_channels.values():
@@ -478,6 +499,9 @@
 
 
     def queue_request(self, req):
+        """
+        Queues requests to be processed in sequence by tasks.
+        """
         rr = self.ready_requests
         do_now = 1
         if rr is None:
@@ -545,6 +569,9 @@
 
 
 class CommonHitLogger:
+    """
+    Outputs hits in common HTTP log format.
+    """
 
     def __init__(self, logger_object=None, resolver=None):
         if logger_object is None:
@@ -591,6 +618,10 @@
 
 
     def log(self, task):
+        """
+        Receives a completed task and logs it in the
+        common log format.
+        """
         now = time.time()
         request_data = task.request_data
         req_headers = request_data.headers


=== Zope3/lib/python/Zope/Server/TaskThreads.py 1.1.2.4 => 1.1.2.5 ===
 
 
-class ITask:  # Interface
-
-    def service():
-        """
-        Services the task.  Either service() or cancel() is called
-        for every task queued.
-        """
-
-    def cancel():
-        """
-        Called instead of service() during shutdown or if an
-        exception occurs that prevents the task from being
-        serviced.  Must return quickly and should not throw exceptions.
-        """
-
-    def defer():
-        """
-        Called just before the task is queued to be executed in
-        a different thread.
-        """
-
-
 class ThreadedTaskDispatcher:
 
+    stop_count = 0  # Number of threads that will stop soon.
+
     def __init__(self):
         self.threads = {}  # { thread number -> 1 }
         self.queue = Queue()
@@ -62,8 +42,14 @@
                 except:
                     self.error('Exception during task', sys.exc_info())
         finally:
-            try: del threads[thread_no]
-            except KeyError: pass
+            mlock = self.thread_mgmt_lock
+            mlock.acquire()
+            try:
+                self.stop_count -= 1
+                try: del threads[thread_no]
+                except KeyError: pass
+            finally:
+                mlock.release()
 
     def setThreadCount(self, count):
         mlock = self.thread_mgmt_lock
@@ -71,22 +57,29 @@
         try:
             threads = self.threads
             thread_no = 0
-            while (len(threads) < count):
+            running = len(threads) - self.stop_count
+            while running < count:
+                # Start threads.
                 while threads.has_key(thread_no):
                     thread_no = thread_no + 1
                 threads[thread_no] = 1
+                running += 1
                 start_new_thread(self.handlerThread, (thread_no,))
                 thread_no = thread_no + 1
-            if len(threads) > count:
-                to_kill = len(threads) - count
-                for n in range(to_kill):
+            if running > count:
+                # Stop threads.
+                to_stop = running - count
+                self.stop_count += to_stop
+                for n in range(to_stop):
                     self.queue.put(None)
+                    running -= 1
         finally:
             mlock.release()
 
     def addTask(self, task):
         if task is None:
             raise ValueError, "No task passed to addTask()."
+        # assert ITask.isImplementedBy(task)
         try:
             task.defer()
             self.queue.put(task)
@@ -105,6 +98,7 @@
 
     def shutdown(self, cancel_pending=1):
         self.setThreadCount(0)
+        # Ensure the threads shut down.
         threads = self.threads
         timeout = time() + 5  # Up to 5 seconds.
         while threads:
@@ -112,10 +106,13 @@
                 self.error("%d zombie threads still exist" % len(threads))
             sleep(0.1)
         if cancel_pending:
+            # Cancel remaining tasks.
             try:
-                while 1:
-                    task = self.queue.get_nowait()
-                    task.cancel()
+                queue = self.queue
+                while not queue.empty():
+                    task = queue.get()
+                    if task is not None:
+                        task.cancel()
             except Empty:
                 pass