[Checkins] SVN: Sandbox/J1m/resumelb/src/zc/resumelb/ Added the ability to load/save worker resumes.
Jim Fulton
jim at zope.com
Sun Jan 29 21:04:40 UTC 2012
Log message for revision 124248:
Added the ability to load/save worker resumes.
Changed:
U Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
U Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
U Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
U Sandbox/J1m/resumelb/src/zc/resumelb/zk.test
-=-
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.py 2012-01-29 19:22:53 UTC (rev 124247)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.py 2012-01-29 21:04:39 UTC (rev 124248)
@@ -5,6 +5,8 @@
import gevent.server
import gevent.threadpool
import logging
+import marshal
+import os
import sys
import time
import zc.mappingobject
@@ -22,11 +24,18 @@
class Worker:
- def __init__(self, app, addr, settings):
+ def __init__(self, app, addr, settings, resume_file=None):
self.app = app
self.settings = zc.mappingobject.mappingobject(settings)
self.worker_request_number = 0
+ self.resume_file = resume_file
self.resume = {}
+ if self.resume_file and os.path.exists(self.resume_file):
+ try:
+ with open(self.resume_file) as f:
+ self.resume = marshal.load(f)
+ except Exception:
+ logger.exception('reading resume file')
self.time_ring = []
self.time_ring_pos = 0
self.connections = set()
@@ -135,6 +144,14 @@
def new_resume(self, resume):
self.resume = resume
+
+ if self.resume_file:
+ try:
+ with open(self.resume_file, 'w') as f:
+ marshal.dump(resume, f)
+ except Exception:
+ logger.exception('reading resume file')
+
for conn in self.connections:
if conn.is_connected:
try:
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.test 2012-01-29 19:22:53 UTC (rev 124247)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.test 2012-01-29 21:04:39 UTC (rev 124248)
@@ -361,3 +361,42 @@
Cleanup:
>>> worker.stop()
+
+Saving and loading resumes
+--------------------------
+
+Workers can store their resumes. This is useful if a worker has
+state, such as cache data that can be retained accross restarts. A
+worker will load data from and save data to a file if the
+``resume_file`` option is used to specify a resume file name:
+
+ >>> import marshal
+ >>> with open('resume.mar', 'w') as f:
+ ... marshal.dump(dict(a=1.0, b=2.0), f)
+
+ >>> worker = zc.resumelb.worker.Worker(
+ ... zc.resumelb.tests.app(), ('127.0.0.1', 0), dict(history=2),
+ ... resume_file='resume.mar')
+
+ >>> from pprint import pprint
+ >>> env = newenv('test', '/hi.html')
+ >>> worker_socket = gevent.socket.create_connection(worker.addr)
+ >>> pprint(read_message(worker_socket))
+ (0, {'a': 1.0, 'b': 2.0})
+
+ >>> write_message(worker_socket, 1, env, '')
+ >>> print_response(worker_socket, 1) # doctest: +ELLIPSIS
+ 1 200 OK...
+ >>> write_message(worker_socket, 2, env, '')
+ >>> print_response(worker_socket, 2) # doctest: +ELLIPSIS
+ 2 200 OK...
+
+At this point, the worker should have output a new resume.
+
+ >>> rno, resume = read_message(worker_socket)
+ >>> rno, list(resume)
+ (0, ['test'])
+
+ >>> with open('resume.mar') as f:
+ ... list(marshal.load(f))
+ ['test']
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/zk.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.py 2012-01-29 19:22:53 UTC (rev 124247)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.py 2012-01-29 21:04:39 UTC (rev 124248)
@@ -22,6 +22,7 @@
import zc.zk
def worker(app, global_conf, zookeeper, path, loggers=None, address=':0',
+ resume_file=None,
run=True):
"""Paste deploy server runner
"""
@@ -32,7 +33,7 @@
zk = zc.zk.ZooKeeper(zookeeper)
address = zc.parse_addr.parse_addr(address)
from zc.resumelb.worker import Worker
- worker = Worker(app, address, zk.properties(path))
+ worker = Worker(app, address, zk.properties(path), resume_file)
zk.register_server(path+'/providers', worker.addr)
worker.zk = zk
if run:
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/zk.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/zk.test 2012-01-29 19:22:53 UTC (rev 124247)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/zk.test 2012-01-29 21:04:39 UTC (rev 124248)
@@ -54,12 +54,16 @@
Let's create a worker, making sure that ZConfig.configureLoggers was called.
>>> app = zc.resumelb.tests.app()
- >>> import mock
+ >>> import mock, marshal
+ >>> with open('resume.mar', 'w') as f:
+ ... marshal.dump(dict(a=1.0, b=2.0), f)
>>> with mock.patch('ZConfig.configureLoggers') as configureLoggers:
... worker = zc.resumelb.zk.worker(
... app, None,
... zookeeper='zookeeper.example.com:2181', path='/test/lb/workers',
- ... address='127.0.0.1:0', run=False, loggers='loggers')
+ ... address='127.0.0.1:0', run=False, loggers='loggers',
+ ... resume_file='resume.mar',
+ ... )
... configureLoggers.assert_called_with('loggers')
Normally, when used with paste, the worker function runs forever. We
@@ -71,6 +75,12 @@
>>> worker.settings.history
999
+It loaded it's resume from resume.mar:
+
+ >>> from pprint import pprint
+ >>> pprint(worker.resume)
+ {'a': 1.0, 'b': 2.0}
+
It register's it's address as an emphemeral subnode of the provider's
subnode of the given path:
@@ -243,7 +253,7 @@
the test request classifier was used.
>>> list(lb.pool.skilled)
- ["yup, it's a test"]
+ ['a', 'b', "yup, it's a test"]
OK. now let's shut down the server and lb.
More information about the checkins
mailing list