[Checkins] SVN: Sandbox/J1m/resumelb/s Added a thread pool.
Jim Fulton
jim at zope.com
Sun Dec 18 17:31:07 UTC 2011
Log message for revision 123839:
Added a thread pool.
Changed:
U Sandbox/J1m/resumelb/setup.py
A Sandbox/J1m/resumelb/src/zc/resumelb/thread.py
A Sandbox/J1m/resumelb/src/zc/resumelb/thread.test
-=-
Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py 2011-12-18 00:28:04 UTC (rev 123838)
+++ Sandbox/J1m/resumelb/setup.py 2011-12-18 17:31:06 UTC (rev 123839)
@@ -13,7 +13,7 @@
##############################################################################
name, version = 'zc.resumelb', '0'
-install_requires = ['setuptools', 'gevent', 'WebOb']
+install_requires = ['setuptools', 'gevent', 'WebOb', 'zc.thread']
extras_require = dict(
test=['zope.testing', 'bobo', 'manuel', 'WebTest'])
Added: Sandbox/J1m/resumelb/src/zc/resumelb/thread.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/thread.py (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/thread.py 2011-12-18 17:31:06 UTC (rev 123839)
@@ -0,0 +1,92 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+#
+# Thread pool implementation based on: https://bitbucket.org/denis/
+# gevent-playground/src/49d1cdcdf643/geventutil/threadpool.py
+
+import fcntl
+import gevent.core
+import gevent.event
+import os
+import Queue
+import threading
+import zc.thread
+
+###############################################################################
+# The following code is from the above URL:
+
+# Simple wrapper to os.pipe() - but sets to non-block
+def _pipe():
+ r, w = os.pipe()
+ fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK)
+ fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK)
+ return r, w
+
+_core_pipe_read, _core_pipe_write = _pipe()
+
+def _core_pipe_read_callback(event, evtype):
+ try:
+ os.read(event.fd, 1)
+ except EnvironmentError:
+ pass
+
+gevent.core.event(gevent.core.EV_READ|gevent.core.EV_PERSIST, \
+ _core_pipe_read, _core_pipe_read_callback).add()
+
+# MTAsyncResult is greatly simplified from version in https://bitbucket.org/
+# denis/gevent-playground/src/49d1cdcdf643/geventutil/threadpool.py
+class MTAsyncResult(gevent.event.AsyncResult):
+
+ def set_exception(self, exception):
+ gevent.event.AsyncResult.set_exception(self, exception)
+ os.write(_core_pipe_write, '\0')
+
+ def set(self, value=None):
+ gevent.event.AsyncResult.set(self, value)
+ os.write(_core_pipe_write, '\0')
+
+#
+###############################################################################
+
+class Pool:
+
+ def __init__(self, size):
+ self.size = size
+ self.queue = queue = Queue.Queue()
+
+ def run():
+ while 1:
+ result, job, args = queue.get()
+ try:
+ result.set(job(*args))
+ except Exception, v:
+ if result is None:
+ return #closes
+ result.set_exception(v)
+
+ run.__name__ = __name__
+
+ self.threads = [zc.thread.Thread(run) for i in range(size)]
+
+ def result(self, job, *args):
+ result = MTAsyncResult()
+ self.queue.put((result, job, args))
+ return result
+
+ def close(self, timeout=1):
+ for thread in self.threads:
+ self.queue.put((None, None, None))
+ for thread in self.threads:
+ thread.join(timeout)
+
Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/thread.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Added: Sandbox/J1m/resumelb/src/zc/resumelb/thread.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/thread.test (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/thread.test 2011-12-18 17:31:06 UTC (rev 123839)
@@ -0,0 +1,54 @@
+resumelb thread pool
+====================
+
+Applications bloc.
+
+To deal with this, we provide a very basic thread pool.
+
+ >>> import zc.resumelb.thread
+
+ >>> pool = zc.resumelb.thread.Pool(4)
+
+We specified a pool size of 4, so 4 threads are created:
+
+ >>> import threading
+ >>> len([t for t in threading.enumerate()
+ ... if t.name == 'zc.resumelb.thread'])
+ 4
+
+They are all deamonic:
+
+ >>> len([t for t in threading.enumerate()
+ ... if t.name == 'zc.resumelb.thread'])
+ 4
+
+To get something done, call the pool result method with a callable and
+arguments:
+
+ >>> import time
+ >>> def job(t):
+ ... time.sleep(t)
+ ... return threading.current_thread().ident, t
+
+ >>> result = pool.result(job, 0)
+
+The result is an async result:
+
+ >>> ident, sleep = result.get()
+ >>> idents = set(t.ident for t in threading.enumerate()
+ ... if t.name == 'zc.resumelb.thread')
+
+ >>> ident in idents and sleep == 0
+ True
+
+If we actually sleep, so as to block, we can end up using all of the
+threads in the thread pool:
+
+ >>> results = [pool.result(job, 0.01) for i in range(6)]
+ >>> set(r.get()[0] for r in results) == idents
+ True
+
+When we're done with a pool, it's noce to close it. This allows us to
+wait for pending jobs and close it down in an orderly fashion:
+
+ >>> pool.close()
Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/thread.test
___________________________________________________________________
Added: svn:eol-style
+ native
More information about the checkins
mailing list