[Checkins] SVN: zc.async/trunk/src/zc/async/ make agent threads
keep their connections, as a potential optimization for
connection object caches;
switch new event test to use zope.component.eventtesting;
give credit in CHANGES file.
Gary Poster
gary at zope.com
Fri Apr 18 20:42:27 EDT 2008
Log message for revision 85488:
make agent threads keep their connections, as a potential optimization for connection object caches; switch new event test to use zope.component.eventtesting; give credit in CHANGES file.
Changed:
U zc.async/trunk/src/zc/async/CHANGES.txt
U zc.async/trunk/src/zc/async/TODO.txt
U zc.async/trunk/src/zc/async/dispatcher.py
U zc.async/trunk/src/zc/async/subscribers.txt
-=-
Modified: zc.async/trunk/src/zc/async/CHANGES.txt
===================================================================
--- zc.async/trunk/src/zc/async/CHANGES.txt 2008-04-19 00:27:42 UTC (rev 85487)
+++ zc.async/trunk/src/zc/async/CHANGES.txt 2008-04-19 00:42:26 UTC (rev 85488)
@@ -1,10 +1,16 @@
1.1 (unreleased)
================
-Fired events when the IQueues and IQueue objects are installed by the
-QueueInstaller.
+- Fired events when the IQueues and IQueue objects are installed by the
+ QueueInstaller (thanks to Fred Drake).
+- Dispatchers make agent threads keep their connections, so each connection's
+ object cache use is optimized if the agent regularly requests jobs with
+ the same objects.
+- README improved (thanks to Benji York and Sebastian Ware).
+
+
1.0 (2008-04-09)
================
Modified: zc.async/trunk/src/zc/async/TODO.txt
===================================================================
--- zc.async/trunk/src/zc/async/TODO.txt 2008-04-19 00:27:42 UTC (rev 85487)
+++ zc.async/trunk/src/zc/async/TODO.txt 2008-04-19 00:42:26 UTC (rev 85488)
@@ -1,8 +1,12 @@
-- Write the z3monitor tests.
-- See if combined README + README_2 + README_3 makes a comprehensible document
-
For future versions:
+- Write the z3monitor tests.
+- add a job.copy method that can be used to copy a completed task into a new
+ one
+- try to affiliate connections with agents. Actually...maybe all that's
+ required there is to not get a new connection for each job, but keep it open.
+ Threads are already affiliated with agents. Affiliation might might be good
+ if an agent is targeting specific jobs that keep touching the same objects.
- queues should be pluggable like agent with filter
- show how to broadcast, maybe add conveniences
- show how to use with collapsing jobs (hint to future self: use external queue
Modified: zc.async/trunk/src/zc/async/dispatcher.py
===================================================================
--- zc.async/trunk/src/zc/async/dispatcher.py 2008-04-19 00:27:42 UTC (rev 85487)
+++ zc.async/trunk/src/zc/async/dispatcher.py 2008-04-19 00:42:26 UTC (rev 85488)
@@ -115,19 +115,23 @@
def perform_thread(self):
local.dispatcher = self.dispatcher
+ conn = self.dispatcher.db.open()
try:
job = self.queue.get()
while job is not None:
- db, identifier, info = job
+ identifier, dbname, info = job
info['thread'] = thread.get_ident()
info['started'] = datetime.datetime.utcnow()
zc.async.utils.tracelog.info(
'starting in thread %d: %r',
info['thread'], info['call'])
- conn = db.open()
try:
transaction.begin()
- job = conn.get(identifier)
+ if dbname is None:
+ local_conn = conn
+ else:
+ local_conn = conn.get_connection(dbname)
+ job = local_conn.get(identifier)
local.job = job
try:
job() # this does the committing and retrying, largely
@@ -152,7 +156,6 @@
finally:
local.job = None
transaction.abort()
- conn.close()
if info['failed']:
log = zc.async.utils.tracelog.error
else:
@@ -164,6 +167,7 @@
info['thread'], info['result'])
job = self.queue.get()
finally:
+ conn.close()
if self.dispatcher.activated:
# this may cause some bouncing, but we don't ever want to end
# up with fewer than needed.
@@ -404,7 +408,7 @@
self.jobs[jobid] = info
job_info.append(jobid)
pool.queue.put(
- (job._p_jar.db(), job._p_oid, info))
+ (job._p_oid, dbname, info))
job = self._getJob(agent)
queue.dispatchers.ping(self.UUID)
self._commit('trying to commit ping')
Modified: zc.async/trunk/src/zc/async/subscribers.txt
===================================================================
--- zc.async/trunk/src/zc/async/subscribers.txt 2008-04-19 00:27:42 UTC (rev 85487)
+++ zc.async/trunk/src/zc/async/subscribers.txt 2008-04-19 00:42:26 UTC (rev 85488)
@@ -96,20 +96,6 @@
>>> zc.async.subscribers.agent_installer.size
3
-When an IQueues or IQueue is installed, an event is fired that provides the
-object being added, the container it is added to, and the name under which it
-is added. Let's add a subscriber that shows us these events:
-
- >>> def show_object_added(event):
- ... if zc.async.interfaces.IObjectAdded.providedBy(event):
- ... print "----"
- ... print event.object.__class__, "object"
- ... print "added to", event.parent.__class__
- ... print "with name", repr(event.name)
-
- >>> import zope.event
- >>> zope.event.subscribers.append(show_object_added)
-
Now we can install the subscribers and give it a try. As we said above,
normally the database opened event only fires once; this is just for purpose of
demonstration. We unregister the previous handler so nothing gets confused.
@@ -122,14 +108,6 @@
>>> zope.component.provideHandler(
... zc.async.subscribers.agent_installer)
>>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
- ----
- <class 'zc.async.queue.Queues'> object
- added to <class 'persistent.mapping.PersistentMapping'>
- with name 'zc.async'
- ----
- <class 'zc.async.queue.Queue'> object
- added to <class 'zc.async.queue.Queues'>
- with name ''
Now if we look in the database, we'll find a queues collection in another
database, with a queue, with a dispatcher, with an agent.
@@ -159,6 +137,27 @@
>>> bool(da.activated)
True
+When an IQueues or IQueue is installed, an event is fired that provides the
+object being added, the container it is added to, and the name under which it
+is added. Therefore, two ObjectAdded events have fired now, one for a queues
+colection and one for a queue.
+
+ >>> from zope.component import eventtesting
+ >>> for event in eventtesting.getEvents(zc.async.interfaces.IObjectAdded):
+ ... print "----"
+ ... print event.object.__class__, "object"
+ ... print "added to", event.parent.__class__
+ ... print "with name", repr(event.name)
+ ...
+ ----
+ <class 'zc.async.queue.Queues'> object
+ added to <class 'persistent.mapping.PersistentMapping'>
+ with name 'zc.async'
+ ----
+ <class 'zc.async.queue.Queue'> object
+ added to <class 'zc.async.queue.Queues'>
+ with name ''
+
Finally, we mentioned at the start that the threaded dispatcher installer also
installed some signal handlers. Let's show a SIGINT (CTRL-C, usually), and
how it deactivates the dispatcher's agents collection in the ZODB.
@@ -188,10 +187,6 @@
>>> bool(da.activated)
False
-Let's clean up the subscriber we added:
-
- >>> zope.event.subscribers.remove(show_object_added)
-
.. ......... ..
.. Footnotes ..
.. ......... ..
More information about the Checkins
mailing list