[Checkins] SVN: Sandbox/J1m/resumelb/ Updated to work with gevent 1. Woo hoo! No longer need our own thread

Jim Fulton jim at zope.com
Fri Jan 27 11:39:31 UTC 2012


Log message for revision 124212:
  Updated to work with gevent 1.  Woo hoo! No longer need our own thread
  pool implementation and no longer need to buid gevent.
  
  In doing so, fixed a threading bug in worker.py and a race condition
  in lb.test.
  

Changed:
  U   Sandbox/J1m/resumelb/buildout.cfg
  U   Sandbox/J1m/resumelb/setup.py
  U   Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
  D   Sandbox/J1m/resumelb/src/zc/resumelb/thread.py
  D   Sandbox/J1m/resumelb/src/zc/resumelb/thread.test
  U   Sandbox/J1m/resumelb/src/zc/resumelb/worker.py

-=-
Modified: Sandbox/J1m/resumelb/buildout.cfg
===================================================================
--- Sandbox/J1m/resumelb/buildout.cfg	2012-01-27 11:26:35 UTC (rev 124211)
+++ Sandbox/J1m/resumelb/buildout.cfg	2012-01-27 11:39:31 UTC (rev 124212)
@@ -1,22 +1,12 @@
 [buildout]
 develop = .
-parts = gevent py ctl
+parts = py ctl
 
 [ctl]
 recipe = zc.recipe.rhrc
 dest = ${buildout:bin-directory}
 parts = lb worker
 
-[libevent]
-recipe = zc.recipe.cmmi
-url = https://github.com/downloads/libevent/libevent/libevent-2.0.14-stable.tar.gz
-
-[gevent]
-recipe = zc.recipe.egg:custom
-include-dirs = ${libevent:location}/include
-library-dirs = ${libevent:location}/lib
-rpath = ${:library-dirs}
-
 [test]
 recipe = zc.recipe.testrunner
 eggs = zc.resumelb [test]

Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py	2012-01-27 11:26:35 UTC (rev 124211)
+++ Sandbox/J1m/resumelb/setup.py	2012-01-27 11:39:31 UTC (rev 124212)
@@ -14,7 +14,7 @@
 name, version = 'zc.resumelb', '0'
 
 install_requires = [
-    'setuptools', 'gevent', 'WebOb', 'zc.thread', 'zc.parse_addr',
+    'setuptools', 'gevent >=1.0b1', 'WebOb', 'zc.thread', 'zc.parse_addr',
     'zc.mappingobject', 'llist']
 extras_require = dict(
     test=['zope.testing', 'bobo', 'manuel', 'WebTest'])

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2012-01-27 11:26:35 UTC (rev 124211)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.test	2012-01-27 11:39:31 UTC (rev 124212)
@@ -44,6 +44,7 @@
 
     >>> write_message(worker1, 0, {'h1.com': 10.0})
     >>> write_message(worker2, 0, {'h2.com': 10.0})
+    >>> gevent.sleep(.01) # Give resumes time to arrive
 
 Now, let's make a request and make sure the data gets where it's
 supposed to go.
@@ -53,6 +54,7 @@
     >>> g1 = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
 
     >>> rno, env1 = read_message(worker1)
+
     >>> rno
     1
     >>> from pprint import pprint
@@ -305,7 +307,7 @@
 
     >>> lb.connect_sleep = 0.01
     >>> port = workers[0].server.server_port # We'll reuse below
-    >>> workers[0].server.kill()
+    >>> workers[0].server.stop()
     >>> socket = workers[0].socket
     >>> socket.close()
     >>> gevent.sleep(.01)

Deleted: Sandbox/J1m/resumelb/src/zc/resumelb/thread.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/thread.py	2012-01-27 11:26:35 UTC (rev 124211)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/thread.py	2012-01-27 11:39:31 UTC (rev 124212)
@@ -1,99 +0,0 @@
-##############################################################################
-#
-# Copyright Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-#
-# Thread pool implementation based on: https://bitbucket.org/denis/
-#   gevent-playground/src/49d1cdcdf643/geventutil/threadpool.py
-
-import fcntl
-import gevent.core
-import gevent.event
-import os
-import Queue
-import threading
-import zc.thread
-
-###############################################################################
-# The following code is from the above URL:
-
-# Simple wrapper to os.pipe() - but sets to non-block
-def _pipe():
-    r, w = os.pipe()
-    fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK)
-    fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK)
-    return r, w
-
-_core_pipe_read, _core_pipe_write = _pipe()
-
-def _core_pipe_read_callback(event, evtype):
-    try:
-        os.read(event.fd, 1)
-    except EnvironmentError:
-        pass
-
-gevent.core.event(gevent.core.EV_READ|gevent.core.EV_PERSIST,
-                  _core_pipe_read, _core_pipe_read_callback).add()
-
-def wake_gevent():
-    os.write(_core_pipe_write, '\0')
-
-# MTAsyncResult is greatly simplified from version in https://bitbucket.org/
-#   denis/gevent-playground/src/49d1cdcdf643/geventutil/threadpool.py
-class MTAsyncResult(gevent.event.AsyncResult):
-
-    def set_exception(self, exception):
-        gevent.event.AsyncResult.set_exception(self, exception)
-        wake_gevent()
-
-    def set(self, value=None):
-        gevent.event.AsyncResult.set(self, value)
-        wake_gevent()
-
-#
-###############################################################################
-
-class Pool:
-
-    def __init__(self, size):
-        self.size = size
-        self.queue = queue = Queue.Queue()
-
-        def run():
-            while 1:
-                result, job, args = queue.get()
-                try:
-                    result.set(job(*args))
-                except Exception, v:
-                    if result is None:
-                        return #closes
-                    result.set_exception(v)
-
-        run.__name__ = __name__
-
-        self.threads = [zc.thread.Thread(run) for i in range(size)]
-
-    def result(self, job, *args):
-        result = MTAsyncResult()
-        self.queue.put((result, job, args))
-        return result
-
-    def apply(self, job, *args):
-        result = self.result(job, *args)
-        return result.get()
-
-    def close(self, timeout=1):
-        for thread in self.threads:
-            self.queue.put((None, None, None))
-        for thread in self.threads:
-            thread.join(timeout)
-

Deleted: Sandbox/J1m/resumelb/src/zc/resumelb/thread.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/thread.test	2012-01-27 11:26:35 UTC (rev 124211)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/thread.test	2012-01-27 11:39:31 UTC (rev 124212)
@@ -1,54 +0,0 @@
-resumelb thread pool
-====================
-
-Applications bloc.
-
-To deal with this, we provide a very basic thread pool.
-
-    >>> import zc.resumelb.thread
-
-    >>> pool = zc.resumelb.thread.Pool(4)
-
-We specified a pool size of 4, so 4 threads are created:
-
-    >>> import threading
-    >>> len([t for t in threading.enumerate()
-    ...      if t.name == 'zc.resumelb.thread'])
-    4
-
-They are all deamonic:
-
-    >>> len([t for t in threading.enumerate()
-    ...      if t.name == 'zc.resumelb.thread'])
-    4
-
-To get something done, call the pool result method with a callable and
-arguments:
-
-    >>> import time
-    >>> def job(t):
-    ...     time.sleep(t)
-    ...     return threading.current_thread().ident, t
-
-    >>> result = pool.result(job, 0)
-
-The result is an async result:
-
-    >>> ident, sleep = result.get()
-    >>> idents = set(t.ident for t in threading.enumerate()
-    ...              if t.name == 'zc.resumelb.thread')
-
-    >>> ident in idents and sleep == 0
-    True
-
-If we actually sleep, so as to block, we can end up using all of the
-threads in the thread pool:
-
-    >>> results = [pool.result(job, 0.01) for i in range(6)]
-    >>> set(r.get()[0] for r in results) == idents
-    True
-
-When we're done with a pool, it's noce to close it.  This allows us to
-wait for pending jobs and close it down in an orderly fashion:
-
-    >>> pool.close()

Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2012-01-27 11:26:35 UTC (rev 124211)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.py	2012-01-27 11:39:31 UTC (rev 124212)
@@ -3,12 +3,12 @@
 import gevent
 import gevent.hub
 import gevent.server
+import gevent.threadpool
 import logging
 import sys
 import time
 import zc.mappingobject
 import zc.resumelb.util
-import zc.resumelb.thread
 
 logger = logging.getLogger(__name__)
 
@@ -32,10 +32,10 @@
         self.connections = set()
 
         if settings.get('threads'):
-            pool = zc.resumelb.thread.Pool(self.settings.threads)
+            pool = gevent.threadpool.ThreadPool(settings['threads'])
             self.apply = pool.apply
         else:
-            self.apply = lambda f, *a: f(*a)
+            self.apply = apply
 
         self.server = gevent.server.StreamServer(addr, self.handle_connection)
         self.server.start()
@@ -84,12 +84,15 @@
                     break
             f.seek(0)
 
+            response = [0]
             def start_response(status, headers, exc_info=None):
                 assert not exc_info # XXX
-                conn.put((rno, (status, headers)))
+                response[0] = (status, headers)
 
             try:
-                for data in self.apply(self.app, env, start_response):
+                body = self.apply(self.app, (env, start_response))
+                conn.put((rno, response[0]))
+                for data in body:
                     conn.put((rno, data))
 
                 conn.put((rno, ''))



More information about the checkins mailing list