[Checkins] SVN: Sandbox/J1m/resumelb/s Added a thread pool.

Jim Fulton jim at zope.com
Sun Dec 18 17:31:07 UTC 2011


Log message for revision 123839:
  Added a thread pool.
  

Changed:
  U   Sandbox/J1m/resumelb/setup.py
  A   Sandbox/J1m/resumelb/src/zc/resumelb/thread.py
  A   Sandbox/J1m/resumelb/src/zc/resumelb/thread.test

-=-
Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py	2011-12-18 00:28:04 UTC (rev 123838)
+++ Sandbox/J1m/resumelb/setup.py	2011-12-18 17:31:06 UTC (rev 123839)
@@ -13,7 +13,7 @@
 ##############################################################################
 name, version = 'zc.resumelb', '0'
 
-install_requires = ['setuptools', 'gevent', 'WebOb']
+install_requires = ['setuptools', 'gevent', 'WebOb', 'zc.thread']
 extras_require = dict(
     test=['zope.testing', 'bobo', 'manuel', 'WebTest'])
 

Added: Sandbox/J1m/resumelb/src/zc/resumelb/thread.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/thread.py	                        (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/thread.py	2011-12-18 17:31:06 UTC (rev 123839)
@@ -0,0 +1,92 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+#
+# Thread pool implementation based on: https://bitbucket.org/denis/
+#   gevent-playground/src/49d1cdcdf643/geventutil/threadpool.py
+
+import fcntl
+import gevent.core
+import gevent.event
+import os
+import Queue
+import threading
+import zc.thread
+
+###############################################################################
+# The following code is from the above URL:
+
+# Simple wrapper to os.pipe() - but sets to non-block
+def _pipe():
+    r, w = os.pipe()
+    fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK)
+    fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK)
+    return r, w
+
+_core_pipe_read, _core_pipe_write = _pipe()
+
+def _core_pipe_read_callback(event, evtype):
+    try:
+        os.read(event.fd, 1)
+    except EnvironmentError:
+        pass
+
+gevent.core.event(gevent.core.EV_READ|gevent.core.EV_PERSIST, \
+    _core_pipe_read, _core_pipe_read_callback).add()
+
+# MTAsyncResult is greatly simplified from version in https://bitbucket.org/
+#   denis/gevent-playground/src/49d1cdcdf643/geventutil/threadpool.py
+class MTAsyncResult(gevent.event.AsyncResult):
+
+    def set_exception(self, exception):
+        gevent.event.AsyncResult.set_exception(self, exception)
+        os.write(_core_pipe_write, '\0')
+
+    def set(self, value=None):
+        gevent.event.AsyncResult.set(self, value)
+        os.write(_core_pipe_write, '\0')
+
+#
+###############################################################################
+
+class Pool:
+
+    def __init__(self, size):
+        self.size = size
+        self.queue = queue = Queue.Queue()
+
+        def run():
+            while 1:
+                result, job, args = queue.get()
+                try:
+                    result.set(job(*args))
+                except Exception, v:
+                    if result is None:
+                        return #closes
+                    result.set_exception(v)
+
+        run.__name__ = __name__
+
+        self.threads = [zc.thread.Thread(run) for i in range(size)]
+
+    def result(self, job, *args):
+        result = MTAsyncResult()
+        self.queue.put((result, job, args))
+        return result
+
+    def close(self, timeout=1):
+        for thread in self.threads:
+            self.queue.put((None, None, None))
+        for thread in self.threads:
+            thread.join(timeout)
+


Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/thread.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Added: Sandbox/J1m/resumelb/src/zc/resumelb/thread.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/thread.test	                        (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/thread.test	2011-12-18 17:31:06 UTC (rev 123839)
@@ -0,0 +1,54 @@
+resumelb thread pool
+====================
+
+Applications bloc.
+
+To deal with this, we provide a very basic thread pool.
+
+    >>> import zc.resumelb.thread
+
+    >>> pool = zc.resumelb.thread.Pool(4)
+
+We specified a pool size of 4, so 4 threads are created:
+
+    >>> import threading
+    >>> len([t for t in threading.enumerate()
+    ...      if t.name == 'zc.resumelb.thread'])
+    4
+
+They are all deamonic:
+
+    >>> len([t for t in threading.enumerate()
+    ...      if t.name == 'zc.resumelb.thread'])
+    4
+
+To get something done, call the pool result method with a callable and
+arguments:
+
+    >>> import time
+    >>> def job(t):
+    ...     time.sleep(t)
+    ...     return threading.current_thread().ident, t
+
+    >>> result = pool.result(job, 0)
+
+The result is an async result:
+
+    >>> ident, sleep = result.get()
+    >>> idents = set(t.ident for t in threading.enumerate()
+    ...              if t.name == 'zc.resumelb.thread')
+
+    >>> ident in idents and sleep == 0
+    True
+
+If we actually sleep, so as to block, we can end up using all of the
+threads in the thread pool:
+
+    >>> results = [pool.result(job, 0.01) for i in range(6)]
+    >>> set(r.get()[0] for r in results) == idents
+    True
+
+When we're done with a pool, it's noce to close it.  This allows us to
+wait for pending jobs and close it down in an orderly fashion:
+
+    >>> pool.close()


Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/thread.test
___________________________________________________________________
Added: svn:eol-style
   + native



More information about the checkins mailing list