[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/Thread - SelectTrigger.py:1.2 __init__.py:1.2
Jim Fulton
jim@zope.com
Mon, 10 Jun 2002 19:30:07 -0400
Update of /cvs-repository/Zope3/lib/python/Zope/Server/Thread
In directory cvs.zope.org:/tmp/cvs-serv20468/lib/python/Zope/Server/Thread
Added Files:
SelectTrigger.py __init__.py
Log Message:
Merged Zope-3x-branch into newly forked Zope3 CVS Tree.
=== Zope3/lib/python/Zope/Server/Thread/SelectTrigger.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+# -*- Mode: Python; tab-width: 4 -*-
+
+VERSION_STRING = "$Id$"
+
+import asyncore
+import asynchat
+
+import os
+import socket
+import string
+import thread
+
+if os.name == 'posix':
+
+ class Trigger(asyncore.file_dispatcher):
+
+ "Wake up a call to select() running in the main thread"
+
+ # This is useful in a context where you are using Medusa's I/O
+ # subsystem to deliver data, but the data is generated by another
+ # thread. Normally, if Medusa is in the middle of a call to
+ # select(), new output data generated by another thread will have
+ # to sit until the call to select() either times out or returns.
+ # If the trigger is 'pulled' by another thread, it should immediately
+ # generate a READ event on the trigger object, which will force the
+ # select() invocation to return.
+
+ # A common use for this facility: letting Medusa manage I/O for a
+ # large number of connections; but routing each request through a
+ # thread chosen from a fixed-size thread pool. When a thread is
+ # acquired, a transaction is performed, but output data is
+ # accumulated into buffers that will be emptied more efficiently
+ # by Medusa. [picture a server that can process database queries
+ # rapidly, but doesn't want to tie up threads waiting to send data
+ # to low-bandwidth connections]
+
+ # The other major feature provided by this class is the ability to
+ # move work back into the main thread: if you call pull_trigger()
+ # with a thunk argument, when select() wakes up and receives the
+ # event it will call your thunk from within that thread. The main
+ # purpose of this is to remove the need to wrap thread locks around
+ # Medusa's data structures, which normally do not need them. [To see
+ # why this is true, imagine this scenario: A thread tries to push some
+ # new data onto a channel's outgoing data queue at the same time that
+ # the main thread is trying to remove some]
+
+ def __init__ (self):
+ r, w = os.pipe()
+ self.trigger = w
+ asyncore.file_dispatcher.__init__ (self, r)
+ self.lock = thread.allocate_lock()
+ self.thunks = []
+
+ def __repr__ (self):
+ return '<select-trigger (pipe) at %x>' % id(self)
+
+ def readable (self):
+ return 1
+
+ def writable (self):
+ return 0
+
+ def handle_connect (self):
+ pass
+
+ def pull_trigger (self, thunk=None):
+ # print 'PULL_TRIGGER: ', len(self.thunks)
+ if thunk:
+ try:
+ self.lock.acquire()
+ self.thunks.append (thunk)
+ finally:
+ self.lock.release()
+ os.write (self.trigger, 'x')
+
+ def handle_read (self):
+ self.recv (8192)
+ try:
+ self.lock.acquire()
+ for thunk in self.thunks:
+ try:
+ thunk()
+ except:
+ (file, fun, line), t, v, tbinfo = \
+ asyncore.compact_traceback()
+ print 'exception in trigger thunk: (%s:%s %s)' % (
+ t, v, tbinfo)
+ self.thunks = []
+ finally:
+ self.lock.release()
+
+else:
+
+
+ # win32-safe version
+
+ class Trigger (asyncore.dispatcher):
+
+ address = ('127.9.9.9', 19999)
+
+ def __init__ (self):
+ a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+
+ # set TCP_NODELAY to true to avoid buffering
+ w.setsockopt(socket.IPPROTO_TCP, 1, 1)
+
+ # tricky: get a pair of connected sockets
+ host='127.0.0.1'
+ port=19999
+ while 1:
+ try:
+ self.address=(host, port)
+ a.bind(self.address)
+ break
+ except:
+ if port <= 19950:
+ raise 'Bind Error', 'Cannot bind trigger!'
+ port=port - 1
+
+ a.listen (1)
+ w.setblocking (0)
+ try:
+ w.connect (self.address)
+ except:
+ pass
+ r, addr = a.accept()
+ a.close()
+ w.setblocking (1)
+ self.trigger = w
+
+ asyncore.dispatcher.__init__ (self, r)
+ self.lock = thread.allocate_lock()
+ self.thunks = []
+ self._trigger_connected = 0
+
+ def __repr__ (self):
+ return '<select-trigger (loopback) at %x>' % id(self)
+
+ def readable (self):
+ return 1
+
+ def writable (self):
+ return 0
+
+ def handle_connect (self):
+ pass
+
+ def pull_trigger (self, thunk=None):
+ if thunk:
+ try:
+ self.lock.acquire()
+ self.thunks.append (thunk)
+ finally:
+ self.lock.release()
+ self.trigger.send ('x')
+
+ def handle_read (self):
+ self.recv (8192)
+ try:
+ self.lock.acquire()
+ for thunk in self.thunks:
+ try:
+ thunk()
+ except:
+ (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
+ print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
+ self.thunks = []
+ finally:
+ self.lock.release()
+
+
+the_trigger = None
+
+class TriggerFile:
+ "A 'triggered' file object"
+
+ buffer_size = 4096
+
+ def __init__ (self, parent):
+ global the_trigger
+ if the_trigger is None:
+ the_trigger = trigger()
+ self.parent = parent
+ self.buffer = ''
+
+ def write (self, data):
+ self.buffer = self.buffer + data
+ if len(self.buffer) > self.buffer_size:
+ d, self.buffer = self.buffer, ''
+ the_trigger.pull_trigger (
+ lambda d=d,p=self.parent: p.push (d)
+ )
+
+ def writeline (self, line):
+ self.write (line+'\r\n')
+
+ def writelines (self, lines):
+ self.write (
+ string.joinfields (
+ lines,
+ '\r\n'
+ ) + '\r\n'
+ )
+
+ def flush (self):
+ if self.buffer:
+ d, self.buffer = self.buffer, ''
+ the_trigger.pull_trigger (
+ lambda p=self.parent,d=d: p.push (d)
+ )
+
+ def softspace (self, *args):
+ pass
+
+ def close (self):
+ # in a derived class, you may want to call trigger_close() instead.
+ self.flush()
+ self.parent = None
+
+ def trigger_close (self):
+ d, self.buffer = self.buffer, ''
+ p, self.parent = self.parent, None
+ the_trigger.pull_trigger (
+ lambda p=p,d=d: (p.push(d), p.close_when_done())
+ )
+
+if __name__ == '__main__':
+
+ import time
+
+ def thread_function (output_file, i, n):
+ print 'entering thread_function'
+ while n:
+ time.sleep (5)
+ output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
+ output_file.flush()
+ n = n - 1
+ output_file.close()
+ print 'exiting thread_function'
+
+ class thread_parent (asynchat.async_chat):
+
+ def __init__ (self, conn, addr):
+ self.addr = addr
+ asynchat.async_chat.__init__ (self, conn)
+ self.set_terminator ('\r\n')
+ self.buffer = ''
+ self.count = 0
+
+ def collect_incoming_data (self, data):
+ self.buffer = self.buffer + data
+
+ def found_terminator (self):
+ data, self.buffer = self.buffer, ''
+ if not data:
+ asyncore.close_all()
+ print "done"
+ return
+ n = string.atoi (string.split (data)[0])
+ tf = TriggerFile (self)
+ self.count = self.count + 1
+ thread.start_new_thread (thread_function, (tf, self.count, n))
+
+ class ThreadServer(asyncore.dispatcher):
+
+ def __init__ (self, family=socket.AF_INET, address=('', 9003)):
+ asyncore.dispatcher.__init__ (self)
+ self.create_socket (family, socket.SOCK_STREAM)
+ self.set_reuse_addr()
+ self.bind (address)
+ self.listen (5)
+
+ def handle_accept (self):
+ conn, addr = self.accept()
+ tp = thread_parent (conn, addr)
+
+ ThreadServer()
+ #asyncore.loop(1.0, use_poll=1)
+ try:
+ asyncore.loop ()
+ except:
+ asyncore.close_all()
=== Zope3/lib/python/Zope/Server/Thread/__init__.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""