[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server - Adjustments.py:1.1.2.1 ZLogIntegration.py:1.1.2.1 HTTPServer2.py:1.1.2.4 PublisherServers.py:1.1.2.3 TaskThreads.py:1.1.2.2 __init__.py:1.1.2.3 dual_mode_channel.py:1.1.2.4

Shane Hathaway shane@digicool.com
Mon, 26 Nov 2001 18:11:09 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/Server
In directory cvs.zope.org:/tmp/cvs-serv15972/lib/python/Zope/Server

Modified Files:
      Tag: Zope-3x-branch
	HTTPServer2.py PublisherServers.py TaskThreads.py __init__.py 
	dual_mode_channel.py 
Added Files:
      Tag: Zope-3x-branch
	Adjustments.py ZLogIntegration.py 
Log Message:
- Added logging to HTTPServer2.

- Created and used an "Adjustments" module.

- Kill zombie HTTP connections.


=== Added File Zope3/lib/python/Zope/Server/Adjustments.py ===

from medusa.test import max_sockets


class Adjustments:

    # backlog is the argument to pass to socket.listen().
    backlog = 1024

    # recv_bytes is the argument to pass to socket.recv().
    recv_bytes = 8192

    # send_bytes is the number of bytes to send to socket.send().
    send_bytes = 8192

    # Create a tempfile if the pending output data gets larger
    # than outbuf_overflow.  With RAM so cheap, this probably
    # ought to be set to the 16-32 MB range (circa 2001) for
    # good performance with big transfers.  The default is
    # conservative.
    outbuf_overflow = 1050000

    # Create a tempfile if the data received gets larger
    # than inbuf_overflow.
    inbuf_overflow = 525000

    # Stop accepting new connections if too many are already active.
    connection_limit = max_sockets.max_select_sockets() - 3  # Safe

    # Minimum seconds between cleaning up inactive channels.
    cleanup_interval = 300

    # Maximum seconds to leave an inactive connection open.
    channel_timeout = 900

    # Boolean: turn off to ignore premature client disconnects.
    log_socket_errors = 1


default_adj = Adjustments()



=== Added File Zope3/lib/python/Zope/Server/ZLogIntegration.py ===
# Copyright (c) 2001 Zope Corporation and Contributors.  All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
"""
Pokes zLOG default logging into asyncore.
"""

from zLOG import LOG, register_subsystem, BLATHER, INFO, WARNING, ERROR
register_subsystem('ZServer')
severity={'info':INFO, 'warning':WARNING, 'error': ERROR}

def log_info(self, message, type='info'):
    LOG('Zope.Server', severity[type], message)

import asyncore
asyncore.dispatcher.log_info=log_info


=== Zope3/lib/python/Zope/Server/HTTPServer2.py 1.1.2.3 => 1.1.2.4 ===
 from urllib import unquote
 
-from medusa.http_date import build_http_date
+from medusa.http_date import build_http_date, monthname
+from medusa import logger
 
 if SIMULT_MODE:
     from dual_mode_channel import simultaneous_mode_channel as channel_type
 else:
     from dual_mode_channel import dual_mode_channel as channel_type
+
 from dual_mode_channel import OverflowableBuffer
+from Adjustments import default_adj
+
 
 try:
     from cStringIO import StringIO
@@ -43,25 +47,24 @@
         del asyncore.dispatcher.__getattr__
 
 
-default_body = "The HTTP server is running!\r\n" * 10
-
-
 class http_task:
 
     # __implements__ = ITask
 
     instream = None
     close_on_finish = 1
-    status_str = '200 Ok'
+    status = '200'
+    reason = 'Ok'
     wrote_header = 0
     accumulated_headers = None
+    bytes_written = 0
+    auth_user_name = ''
 
     def __init__(self, channel, request_data):
         self.channel = channel
         self.request_data = request_data
         self.response_headers = {
-            'Server' : 'Zope.Server.HTTPServer',
-            'Date'   : build_http_date (time.time())
+            'Server': 'Zope.Server.HTTPServer',
             }
         version = request_data.version
         if version not in ('1.0', '1.1'):
@@ -80,25 +83,29 @@
         """
         try:
             try:
+                self.start()
                 self.execute()
                 self.finish()
-            finally:
-                self.channel.end_task(self.close_on_finish)
-        except socket.error:
-            self.channel.handle_comm_error()
+            except socket.error:
+                self.close_on_finish = 1
+                if self.channel.adj.log_socket_errors:
+                    raise
+        finally:
+            self.channel.end_task(self.close_on_finish)
 
     def cancel(self):
         """
         Called when shutting down the server.
         """
-        self.channel.kill_task()
+        self.channel.close_when_done()
 
     # setResponseStatus(), setResponseHeaders(), appendResponseHeaders(),
-    # and wroteResponseHeader() are part of the IHeaderOutput interface
-    # used by Zope.Publisher.HTTP.HTTPResponse.
+    # wroteResponseHeader(), and setAuthUserName() are part of the
+    # IHeaderOutput interface used by Zope.Publisher.HTTP.HTTPResponse.
 
-    def setResponseStatus(self, s):
-        self.status_str = s
+    def setResponseStatus(self, status, reason):
+        self.status = status
+        self.reason = reason
 
     def setResponseHeaders(self, mapping):
         self.response_headers.update(mapping)
@@ -115,6 +122,9 @@
     def wroteResponseHeader(self):
         return self.wrote_header
 
+    def setAuthUserName(self, name):
+        self.auth_user_name = name
+
     def prepareResponseHeaders(self):
         version = self.version
         # Figure out whether the connection should be closed.
@@ -148,7 +158,7 @@
 
     def buildResponseHeader(self):
         self.prepareResponseHeaders()
-        first_line = 'HTTP/%s %s' % (self.version, self.status_str)
+        first_line = 'HTTP/%s %s %s' % (self.version, self.status, self.reason)
         lines = [first_line] + map(
             lambda hv: '%s: %s' % hv, self.response_headers.items())
         accum = self.accumulated_headers
@@ -157,11 +167,16 @@
         res = '%s\r\n\r\n' % '\r\n'.join(lines)
         return res
 
+    def start(self):
+        now = time.time()
+        self.start_time = now
+        self.response_headers['Date'] = build_http_date (now)
+
     def execute(self):
         """
         Override this.
         """
-        body = default_body
+        body = "The HTTP server is running!\r\n" * 10
         self.response_headers['Content-Type'] = 'text/plain'
         self.response_headers['Content-Length'] = str(len(body))
         self.write(body)
@@ -169,15 +184,19 @@
     def finish(self):
         if not self.wrote_header:
             self.write('')
+        channel = self.channel
+        channel.server.hit_log.log(self)
 
     def write(self, data):
         channel = self.channel
         if not self.wrote_header:
             rh = self.buildResponseHeader()
             channel.sync_write(rh)
+            self.bytes_written += len(rh)
             self.wrote_header = 1
         if data:
-            return channel.sync_write(data)
+            channel.sync_write(data)
+            self.bytes_written += len(data)
 
     def flush(self):
         self.channel.sync_flush()
@@ -226,6 +245,12 @@
     # headers is a mapping containing keys translated to uppercase
     # with dashes turned into underscores.
 
+    def __init__(self, adj):
+        """
+        adj is an Adjustments object.
+        """
+        self.adj = adj
+
     def received(self, data):
         """
         Receives the HTTP stream for one request.
@@ -301,13 +326,13 @@
             if te == 'chunked':
                 from chunking import ChunkedReceiver
                 self.chunked = 1
-                buf = OverflowableBuffer(525000)  # TODO: make configurable
+                buf = OverflowableBuffer(self.adj.inbuf_overflow)
                 self.body_rcv = ChunkedReceiver(buf)
         if not self.chunked:
             cl = int(headers.get('CONTENT_LENGTH', 0))
             self.content_length = cl
             if cl > 0:
-                buf = OverflowableBuffer(525000)  # TODO: make configurable
+                buf = OverflowableBuffer(self.adj.inbuf_overflow)
                 self.body_rcv = StreamedReceiver(cl, buf)
 
 
@@ -367,14 +392,24 @@
 request_queue_lock = allocate_lock()
 
 
+
 class http_channel (channel_type):
 
     task_class = http_task
 
-    active_channels = {}  # Class-specific channel counter
-    proto_request = None
-    ready_requests = None  # A list
-    wedged = 0
+    active_channels = {}      # Class-specific channel tracker
+    proto_request = None      # An http_request_data instance
+    ready_requests = None     # A list
+    last_activity = 0         # Time of last activity
+    running_task = 0          # boolean
+
+    next_channel_cleanup = 0  # A class variable
+
+
+    def __init__(self, server, conn, addr, adj=None):
+        channel_type.__init__(self, server, conn, addr, adj)
+        self.last_activity = t = self.creation_time
+        self.check_maintenance(t)
 
 
     def add_channel(self, map=None):
@@ -384,7 +419,6 @@
         channel_type.add_channel(self, map)
         self.active_channels[self._fileno] = self
 
-
     def del_channel(self, map=None):
         """
         Keeps track of opened HTTP channels.
@@ -394,29 +428,37 @@
         fd = self._fileno
         if ac.has_key(fd):
             del ac[fd]
-        # print 'active HTTP channels:', len(ac)
+
+
+    def check_maintenance(self, now):
+        if now < http_channel.next_channel_cleanup:
+            return
+        http_channel.next_channel_cleanup = now + self.adj.cleanup_interval
+        self.maintenance()
+
+    def maintenance(self):
+        # Note that this is an asynchronous call.
+        # Kill off dead connections.
+        self.kill_zombies()
+
+    def kill_zombies(self):
+        now = time.time()
+        cutoff = now - self.adj.channel_timeout
+        for channel in self.active_channels.values():
+            if (channel is not self and not channel.running_task and
+                channel.last_activity < cutoff):
+                channel.close()
 
 
     def received(self, data):
         """
         Receives input asynchronously and launches or queues requests.
         """
-        if self.wedged:
-            # Ignore input after a bad request.
-            return
         preq = self.proto_request
         while data:
             if preq is None:
-                preq = http_request_data()
-            try:
-                n = preq.received(data)
-            except:
-                # Bad header format or request.  Can't accept more requests.
-                # TODO: use logging.
-                import traceback
-                traceback.print_exc()
-                self.wedged = 1
-                return
+                preq = http_request_data(self.adj)
+            n = preq.received(data)
             if preq.completed:
                 # The request is ready to use.
                 if not preq.empty:
@@ -454,13 +496,16 @@
 
     def create_task(self, req):
         task = self.task_class(self, req)
+        self.running_task = 1
         self.server.addTask(task)
 
 
     def end_task(self, close):
+        self.running_task = 0
         if close:
             self.close_when_done()
         else:
+            self.last_activity = time.time()
             new_task = 0
             req = None
             request_queue_lock.acquire()
@@ -477,7 +522,93 @@
             else:
                 # Wait for another request on this connection.
                 self.set_async()
+
+    def handle_error(self):
+        # Program error
+        t, v = sys.exc_info()[:2]
+        if t is SystemExit or t is KeyboardInterrupt:
+            raise t, v
+        asyncore.dispatcher.handle_error(self)
+
+    def handle_comm_error(self, async=1):
+        if self.adj.log_socket_errors:
+            self.handle_error()
+        else:
+            if async:
+                self.close()
+            # Else this was called by synchronous code and it's
+            # not safe to just close().
+
+
+
+class CommonHitLogger:
+
+    def __init__(self, logger_object=None, resolver=None):
+        if logger_object is None:
+            logger_object = logger.file_logger (sys.stdout)
+
+        if resolver is not None:
+            self.output = logger.resolving_logger (resolver, logger_object)
+        else:
+            self.output = logger.unresolving_logger (logger_object)
             
+    def compute_timezone_for_log(self, tz):
+        if tz > 0:
+            neg = 1
+        else:
+            neg = 0
+            tz = -tz
+        h, rem = divmod (tz, 3600)
+        m, rem = divmod (rem, 60)
+        if neg:
+            return '-%02d%02d' % (h, m)
+        else:
+            return '+%02d%02d' % (h, m)
+
+    tz_for_log = None
+    tz_for_log_alt = None
+
+    def log_date_string (self, when):
+        logtime = time.localtime(when)
+        Y, M, D, h, m, s = logtime[:6]
+
+        if not time.daylight:
+            tz = self.tz_for_log
+            if tz is None:
+                tz = self.compute_timezone_for_log(time.timezone)
+                self.tz_for_log = tz
+        else:
+            tz = self.tz_for_log_alt
+            if tz is None:
+                tz = self.compute_timezone_for_log(time.altzone)
+                self.tz_for_log_alt = tz
+
+        return '%d/%s/%02d:%02d:%02d:%02d %s' % (
+            Y, monthname[M], D, h, m, s, tz)
+
+
+    def log(self, task):
+        now = time.time()
+        request_data = task.request_data
+        req_headers = request_data.headers
+
+        user_name = task.auth_user_name or 'anonymous'
+        user_agent = req_headers.get('USER_AGENT', '')
+        referer = req_headers.get('REFERER', '')
+
+        self.output.log(
+            task.channel.addr[0],
+            ' - %s [%s] "%s" %s %d "%s" "%s"\n' % (
+                user_name,
+                self.log_date_string(now),
+                request_data.first_line,
+                task.status,
+                task.bytes_written,
+                referer,
+                user_agent
+                )
+            )
+
 
 
 class http_server (asyncore.dispatcher):
@@ -486,8 +617,12 @@
 
     SERVER_IDENT = 'Zope.Server.HTTPServer.http_server'
 
-    def __init__(self, ip, port, backlog=1024, tasks=None):
+    def __init__(self, ip, port, tasks=None, adj=None, start=1,
+                 hit_log=None):
         # Assumes sock is already bound.
+        if adj is None:
+            adj = default_adj
+        self.adj = adj
         asyncore.dispatcher.__init__(self)
         self.port = port
         self.tasks = tasks
@@ -495,6 +630,10 @@
         self.set_reuse_addr()
         self.bind((ip, port))
 
+        if hit_log is None:
+            hit_log = CommonHitLogger()
+        self.hit_log = hit_log
+
         host, port = self.socket.getsockname()
         if not ip:
             self.log_info('Computing default hostname', 'info')
@@ -505,8 +644,21 @@
             self.log_info('Cannot do reverse lookup', 'info')
             self.server_name = ip       # use the IP address as the "hostname"
         
-        self.listen(backlog)
+        if start:
+            self.accept_connections()
 
+    def accept_connections(self):
+        self.accepting = 1
+        self.socket.listen(self.adj.backlog)  # Circumvent asyncore's NT limit
+        self.log_info('HTTP server started.\n'
+                      '\tHostname: %s\n\tPort: %d' % (
+			self.server_name,
+			self.port
+			))
+
+    def readable(self):
+        return (self.accepting and
+                len(asyncore.socket_map) < self.adj.connection_limit)
 
     def writable (self):
         return 0
@@ -520,12 +672,6 @@
     def handle_connect (self):
         pass
 
-    def handle_error(self):
-        t = sys.exc_info()[0]
-        if t is KeyboardInterrupt:
-            raise t
-        asyncore.dispatcher.handle_error(self)
-
     def handle_accept (self):
         try:
             v = self.accept()
@@ -540,15 +686,7 @@
             self.log_info ('warning: server accept() threw an exception',
                            'warning')
             return
-        except TypeError:
-                # unpack non-sequence.  this can happen when a read event
-                # fires on a listening socket, but when we call accept()
-                # we get EWOULDBLOCK, so dispatcher.accept() returns None.
-                # Seen on FreeBSD3.
-            self.log_info ('warning: server accept() threw EWOULDBLOCK',
-                           'warning')
-            return
-        self.channel_class(self, conn, addr)
+        self.channel_class(self, conn, addr, self.adj)
 
     def addTask(self, task):
         tasks = self.tasks


=== Zope3/lib/python/Zope/Server/PublisherServers.py 1.1.2.2 => 1.1.2.3 ===
     channel_class = PublisherHTTPChannel
     
-    def __init__(self, request_payload, response_payload,
-                 ip, port, backlog=5, tasks=None):
+    def __init__(self, request_payload, response_payload, *args, **kw):
         self.request_payload = request_payload
         self.response_payload = response_payload
-        http_server.__init__(self, ip, port, backlog, tasks)
+        http_server.__init__(self, *args, **kw)
 
         
 if __name__ == '__main__':


=== Zope3/lib/python/Zope/Server/TaskThreads.py 1.1.2.1 => 1.1.2.2 ===
 
+import sys
 from Queue import Queue, Empty
 from thread import allocate_lock, start_new_thread
 
+try:
+    from zLOG import LOG, ERROR
+except ImportError:
+    LOG = None
+
 
 class ITask:  # Interface
 
@@ -40,9 +46,12 @@
             try:
                 task.service()
             except:
-                # Log somewhere?
-                import traceback
-                traceback.print_exc()
+                if LOG is None:
+                    import traceback
+                    traceback.print_exc()
+                else:
+                    LOG('ThreadedTaskDispatcher', ERROR,
+                        'Exception during task', error=sys.exc_info())
 
     def setThreadCount(self, count):
         mlock = self.thread_mgmt_lock


=== Zope3/lib/python/Zope/Server/__init__.py 1.1.2.2 => 1.1.2.3 ===
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE.
-from medusa.test import max_sockets
 
+"""
+Zope.Server package.
+"""
 
-CONNECTION_LIMIT=max_sockets.max_select_sockets()
 
 
-# Try to poke zLOG default logging into asyncore
-# XXX We should probably should do a better job of this,
-#     however that would mean that ZServer required zLOG.
-try:
-    from zLOG import LOG, register_subsystem, BLATHER, INFO, WARNING, ERROR
-    register_subsystem('ZServer')
-    severity={'info':INFO, 'warning':WARNING, 'error': ERROR}
-
-    def log_info(self, message, type='info'):
-        if message[:14]=='adding channel' or \
-           message[:15]=='closing channel' or \
-           message == 'Computing default hostname':
-            LOG('ZServer', BLATHER, message)
-        else:
-            LOG('ZServer', severity[type], message)     
-
-    import asyncore
-    asyncore.dispatcher.log_info=log_info
-except:
-    pass
-
-# A routine to try to arrange for request sockets to be closed
-# on exec. This makes it easier for folks who spawn long running
-# processes from Zope code. Thanks to Dieter Maurer for this.
-try:
-    import fcntl, FCNTL
-    FCNTL.F_SETFD; FCNTL.FD_CLOEXEC
-    def requestCloseOnExec(sock):
-        try:    fcntl.fcntl(sock.fileno(), FCNTL.F_SETFD, FCNTL.FD_CLOEXEC)
-        except: pass
-
-except (ImportError, AttributeError):
-
-    def requestCloseOnExec(sock):
-        pass
-
-import asyncore
-from medusa import resolver, logger
-from HTTPServer import zhttp_server, zhttp_handler
-from PubCore import setNumberOfThreads
-from medusa.monitor import secure_monitor_server
 
-# override the service name in logger.syslog_logger
-logger.syslog_logger.svc_name='ZServer'
+
+### A routine to try to arrange for request sockets to be closed
+### on exec. This makes it easier for folks who spawn long running
+### processes from Zope code. Thanks to Dieter Maurer for this.
+##try:
+##    import fcntl, FCNTL
+##    FCNTL.F_SETFD; FCNTL.FD_CLOEXEC
+##    def requestCloseOnExec(sock):
+##        try:    fcntl.fcntl(sock.fileno(), FCNTL.F_SETFD, FCNTL.FD_CLOEXEC)
+##        except: pass
+
+##except (ImportError, AttributeError):
+
+##    def requestCloseOnExec(sock):
+##        pass
+
+##import asyncore
+##from medusa import resolver, logger
+##from HTTPServer import zhttp_server, zhttp_handler
+##from PubCore import setNumberOfThreads
+##from medusa.monitor import secure_monitor_server
+
+### override the service name in logger.syslog_logger
+##logger.syslog_logger.svc_name='ZServer'
+


=== Zope3/lib/python/Zope/Server/dual_mode_channel.py 1.1.2.3 => 1.1.2.4 ===
 
 from medusa.thread.select_trigger import trigger
+from Adjustments import default_adj
+
 
 pull_trigger = trigger().pull_trigger
 
@@ -25,6 +27,8 @@
 # copy_bytes controls the size of temp. strings for shuffling data around.
 COPY_BYTES = 1 << 18  # 64K
 
+# The maximum number of bytes to buffer in a simple string.
+STRBUF_LIMIT = 8192
 
 
 class dual_mode_channel (asyncore.dispatcher):
@@ -32,25 +36,19 @@
     The channel switches between asynchronous and synchronous mode.
     """
 
-    # recv_bytes is the argument to pass to socket.recv().
-    recv_bytes = 8192
-    # send_bytes is the number of bytes to send to socket.send().
-    send_bytes = 8192
-
-    # Create a tempfile if the pending output data gets larger
-    # than outbuf_overflow.
-    outbuf_overflow = 1050000  # A little over 1 MB
-
     # will_close is set to 1 to close the socket.
     will_close = 0
 
     # boolean: async or sync mode
     async_mode = 1
 
-    def __init__(self, server, conn, addr):
+    def __init__(self, server, conn, addr, adj=None):
         self.server = server
         self.addr = addr
-        self.outbuf = OverflowableBuffer(self.outbuf_overflow)
+        if adj is None:
+            adj = default_adj
+        self.adj = adj
+        self.outbuf = OverflowableBuffer(adj.outbuf_overflow)
         self.creation_time = time()
         asyncore.dispatcher.__init__(self, conn)
 
@@ -95,7 +93,7 @@
 
     def inner_handle_read(self):
         try:
-            data = self.recv(self.recv_bytes)
+            data = self.recv(self.adj.recv_bytes)
         except socket.error:
             self.handle_comm_error()
             return
@@ -107,17 +105,10 @@
         """
         pass
 
-    def handle_error(self):
-        t = sys.exc_info()[0]
-        if t is KeyboardInterrupt:
-            # Propogate keyboard interrupts (SIGINT).
-            raise t
-        asyncore.dispatcher.handle_error(self)
-
     def handle_comm_error(self):
         """
         Designed for handling communication errors that occur
-        during asynchronous transfers *only*.  Probably should log
+        during asynchronous operations *only*.  Probably should log
         this, but in a different place.
         """
         self.handle_error()
@@ -132,13 +123,12 @@
     def sync_write(self, data):
         if data:
             self.outbuf.append(data)
-        while len(self.outbuf) >= self.send_bytes:
+        while len(self.outbuf) >= self.adj.send_bytes:
             # Send what we can without blocking.
             # We propogate errors to the application on purpose
             # (to prevent unnecessary work).
             if not self._flush_some():
                 break
-        return len(data)
 
     def sync_flush(self):
         """
@@ -168,7 +158,7 @@
     def _flush_some(self):
         outbuf = self.outbuf
         if outbuf:
-            chunk = outbuf.get(self.send_bytes)
+            chunk = outbuf.get(self.adj.send_bytes)
             num_sent = self.send(chunk)
             if num_sent:
                 outbuf.skip(num_sent, 1)
@@ -192,11 +182,6 @@
                 self.async_mode = 1
                 pull_trigger()
 
-    def kill(self):
-        self.will_close = 1
-        self.async_mode = 1  # Needed to actually close.
-        pull_trigger()
-
 
 allocate_lock = None
 
@@ -208,7 +193,7 @@
     and fill the input buffer.
     """
 
-    def __init__(self, server, conn, addr):
+    def __init__(self, server, conn, addr, adj=None):
         global allocate_lock
         if allocate_lock is None:
             from thread import allocate_lock
@@ -217,7 +202,7 @@
         self._writelock_acquire = writelock.acquire
         self._writelock_release = writelock.release
         self._writelock_locked = writelock.locked
-        dual_mode_channel.__init__(self, server, conn, addr)
+        dual_mode_channel.__init__(self, server, conn, addr, adj)
 
     #
     # ASYNCHRONOUS METHODS
@@ -252,7 +237,7 @@
     def sync_write(self, data):
         self._writelock_acquire()
         try:
-            return dual_mode_channel.sync_write(self, data)
+            dual_mode_channel.sync_write(self, data)
         finally:
             self._writelock_release()
 
@@ -274,98 +259,6 @@
         self.will_close = 1
         pull_trigger()
 
-    def kill(self):
-        # Best we can do safely...
-        self.close_when_done()
-
-
-
-##class SimpleStringBuffer:
-##    """
-##    A RAM-based, non-preserving buffer.  May save
-##    memory when used for outgoing data because it keeps
-##    references to the original strings rather than copies.
-##    """
-
-##    def __init__(self, from_buffer=None):
-##        self.data = data = []
-##        sz = 0
-##        if from_buffer is not None:
-##            while 1:
-##                s = from_buffer.get(COPY_BYTES, 1)
-##                if not s:
-##                    break
-##                sz = sz + len(s)
-##                data.append(s)
-##        self.len = sz
-
-##    def __len__(self):
-##        """
-##        Returns the number of bytes that remain to get.
-##        """
-##        return self.len
-
-##    def append(self, s):
-##        """
-##        Adds bytes to the end of the buffer.
-##        """
-##        self.data.append(s)
-##        self.len = self.len + len(s)
-
-##    def get(self, minbytes=-1, skip=0):
-##        """
-##        Returns a string from the start of the buffer, preferring
-##        at least (minsize) bytes, optionally deleting that part.
-##        """
-##        data = self.data
-##        if not data:
-##            return ''
-##        gotbytes = 0
-##        for index in range(len(data)):
-##            gotbytes = gotbytes + len(data[index])
-##            if minbytes >= 0 and gotbytes >= minbytes:
-##                break
-##        res = ''.join(data[:index + 1])
-##        if skip:
-##            del data[:index + 1]
-##            self.len = self.len - gotbytes
-##        return res
-
-##    def skip(self, bytes, allow_prune=1):
-##        """
-##        Since this buffer type is non-preserving, this method
-##        deletes the given number of bytes from the start of the buffer.
-##        """
-##        if not allow_prune:
-##            raise ValueError, "SimpleStringBuffers always prune."
-##        gotbytes = 0
-##        data = self.data
-##        for index in range(len(data)):
-##            s = data[index]
-##            slen = len(s)
-##            gotbytes = gotbytes + slen
-##            if gotbytes > bytes:
-##                position = slen - (gotbytes - bytes)
-##                del data[:index]
-##                data[0] = s[position:]
-##                self.len = self.len - bytes
-##                return
-##            elif gotbytes == bytes:
-##                del data[:index + 1]
-##                self.len = self.len - bytes
-##                return
-##        # Hmm, too many!
-##        raise ValueError, (
-##            "Can't skip %d bytes in buffer of %d bytes" %
-##            (bytes, gotbytes))
-
-##    def prune(self):
-##        # Non-preserving so there's nothing to prune.
-##        pass
-
-##    def getfile(self):
-##        raise 'NotImplemented'
-
 
 
 class FileBasedBuffer:
@@ -483,7 +376,6 @@
     overflowed = 0
     buf = None
     strbuf = ''  # String-based buffer.
-    strbuf_limit = 8192
 
     def __init__(self, overflow):
         # overflow is the maximum to be stored in a StringIO buffer.
@@ -521,7 +413,7 @@
         buf = self.buf
         if buf is None:
             strbuf = self.strbuf
-            if len(strbuf) + len(s) < self.strbuf_limit:
+            if len(strbuf) + len(s) < STRBUF_LIMIT:
                 self.strbuf = strbuf + s
                 return
             buf = self._create_buffer()