[Checkins] SVN: zc.zkzeo/trunk/ Coordinate ZEO clients and servers using ZooKeeper
Jim Fulton
jim at zope.com
Tue Dec 6 22:59:17 UTC 2011
Log message for revision 123615:
Coordinate ZEO clients and servers using ZooKeeper
Changed:
U zc.zkzeo/trunk/buildout.cfg
U zc.zkzeo/trunk/setup.py
A zc.zkzeo/trunk/src/zc/zkzeo/
A zc.zkzeo/trunk/src/zc/zkzeo/README.txt
A zc.zkzeo/trunk/src/zc/zkzeo/__init__.py
A zc.zkzeo/trunk/src/zc/zkzeo/_client.py
A zc.zkzeo/trunk/src/zc/zkzeo/component.xml
A zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py
A zc.zkzeo/trunk/src/zc/zkzeo/schema.xml
A zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml
A zc.zkzeo/trunk/src/zc/zkzeo/tests.py
-=-
Modified: zc.zkzeo/trunk/buildout.cfg
===================================================================
--- zc.zkzeo/trunk/buildout.cfg 2011-12-06 22:56:59 UTC (rev 123614)
+++ zc.zkzeo/trunk/buildout.cfg 2011-12-06 22:59:17 UTC (rev 123615)
@@ -1,12 +1,12 @@
[buildout]
-develop = .
-parts = test py
+develop = . zk
+parts = test
-[test]
-recipe = zc.recipe.testrunner
-eggs =
-
[py]
recipe = zc.recipe.egg
-eggs = ${test:eggs}
+eggs = zc.zkzeo [test]
interpreter = py
+
+[test]
+recipe = zc.recipe.testrunner
+eggs = ${py:eggs}
Modified: zc.zkzeo/trunk/setup.py
===================================================================
--- zc.zkzeo/trunk/setup.py 2011-12-06 22:56:59 UTC (rev 123614)
+++ zc.zkzeo/trunk/setup.py 2011-12-06 22:59:17 UTC (rev 123615)
@@ -11,12 +11,17 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
-name, version = 'zc.', '0'
+name, version = 'zc.zkzeo', '0'
-install_requires = ['setuptools']
-extras_require = dict(test=['zope.testing'])
+install_requires = [
+ 'setuptools', 'zc.zk [static]', 'ZODB3', 'zc.thread']
+extras_require = dict(
+ test=['zope.testing', 'zc.zk [static,test]', 'manuel'],
+ )
entry_points = """
+[console_scripts]
+zkrunzeo = zc.zkzeo.runzeo:main
"""
from setuptools import setup
Added: zc.zkzeo/trunk/src/zc/zkzeo/README.txt
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/README.txt (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/README.txt 2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,179 @@
+=============
+ZEO ZooKeeper
+=============
+
+Managing addresses, and especially ports is a drag. ZooKeeper can be
+used as a service registry. Servers can register themselves and
+clients can find services there. The ``zc.zk`` package provides
+support for registering ZEO servers and a ZEO client storage that gets
+addresses from ZooKeeper.
+
+.. contents::
+
+Running ZEO servers
+===================
+
+To run a ZEO server, and register it with ZooKeeper, first create a
+ZEO configuration file::
+
+ <zeo>
+ address 127.0.0.1
+ </zeo>
+
+ <zookeeper>
+ connection zookeeper.example.com:2181
+ path /databases/demo
+ </zookeeper>
+
+ <filestorage>
+ path demo.fs
+ </filestorage>
+
+.. -> server_conf
+
+The ZEO configuration file has the same options as usual, plus a
+``zookeeper`` section with two options:
+
+``connection``
+ A ZooKeeper connection string. This is typically a list of
+ *HOST:PORT* pairs separated by commas.
+
+``path``
+ The path at which to register the server. The path must already
+ exist. When the server starts, it will register itself by creating
+ a subnode of the path with a name consisting of it's address.
+
+When specifying the ZEO address, you can leave of the port and the
+operating system will assign it for you.
+
+To start the server, use the ``zkrunzeo`` script::
+
+ $ bin/zkrunzeo -C FILENAME
+
+.. test
+
+ >>> import zc.zkzeo.runzeo, zc.zk
+ >>> stop = zc.zkzeo.runzeo.test(
+ ... server_conf, zookeeper='zookeeper.example.com:2181')
+ >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+ >>> print zk.export_tree('/databases/demo', ephemeral=True),
+ /demo
+ /127.0.0.1:56824
+ pid = 88841
+
+where ``FILENAME`` is the name of the configuration file you created.
+
+
+Defining ZEO clients
+====================
+
+You can define a client in 2 ways, from Python and using a
+configuration file.
+
+Defining ZEO clients with Python
+--------------------------------
+
+From Python, use ``zc.zkzeo.client``::
+
+ >>> import zc.zkzeo
+ >>> client = zc.zkzeo.client(
+ ... 'zookeeper.example.com:2181', '/databases/demo',
+ ... read_only=True)
+
+You pass a ZooKeeper connection string and a path. The ``Client``
+constructor will create a client storage with addresses found as
+sub-nodes of the given path and it will adjust the client-storage
+addresses as nodes are added and removed as children of the path.
+
+You can pass all other ``ZEO.ClientStorage.ClientStorage`` arguments,
+except the address, as additional positional and keyword arguments.
+
+
+Defining ZEO clients in configuration files
+-------------------------------------------
+
+In configuration files, use a ``zkzeoclient`` storage
+section::
+
+ %import zc.zkzeo
+
+ <zodb>
+ <zkzeoclient>
+ zookeeper zookeeper.example.com:2181
+ server /databases/demo
+ read-only true
+ </zkzeoclient>
+ </zodb>
+
+.. -> conf
+
+The options for ``zkzeoclient`` are the same as for the standard ZODB
+``zeoclient`` section, except:
+
+- There's an extra required ``zookeeper`` option used to provide a
+ ZooKeeper connection string.
+
+- There can be only one ``server`` option and it is used to supply the
+ path in ZooKeeper where addresses may be found.
+
+.. test
+
+ Double check the clients are working by opening a writable
+ connection and maing sure we see changes:
+
+ >>> writable_db = zc.zkzeo.DB('zookeeper.example.com:2181',
+ ... '/databases/demo')
+ >>> with writable_db.transaction() as conn:
+ ... conn.root.x = 1
+
+ >>> import ZODB.config
+ >>> db_from_config = ZODB.config.databaseFromString(conf)
+ >>> with db_from_config.transaction() as conn:
+ ... print conn.root()
+ {'x': 1}
+
+ >>> import ZODB
+ >>> db_from_py = ZODB.DB(client)
+ >>> with db_from_py.transaction() as conn:
+ ... print conn.root()
+ {'x': 1}
+
+ Restart the storage server and make sure clients reconnect:
+
+ >>> [old_addr] = zk.get_children('/databases/demo')
+ >>> stop().exception
+
+ >>> wait_until(lambda : not client.is_connected())
+ >>> wait_until(lambda : not writable_db.storage.is_connected())
+ >>> wait_until(lambda : not db_from_config.storage.is_connected())
+
+ >>> stop = zc.zkzeo.runzeo.test(
+ ... server_conf, zookeeper='zookeeper.example.com:2181')
+
+ >>> [addr] = zk.get_children('/databases/demo')
+ >>> addr != old_addr
+ True
+ >>> print zk.export_tree('/databases/demo', ephemeral=True),
+ /demo
+ /127.0.0.1:56837
+ pid = 88841
+
+
+ >>> wait_until(writable_db.storage.is_connected)
+ >>> with writable_db.transaction() as conn:
+ ... conn.root.x = 2
+
+ >>> wait_until(db_from_config.storage.is_connected)
+ >>> with db_from_config.transaction() as conn:
+ ... print conn.root()
+ {'x': 2}
+ >>> wait_until(client.is_connected)
+ >>> with db_from_py.transaction() as conn:
+ ... print conn.root()
+ {'x': 2}
+
+ >>> db_from_py.close()
+ >>> db_from_config.close()
+ >>> writable_db.close()
+ >>> stop().exception
+
Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/README.txt
___________________________________________________________________
Added: svn:eol-style
+ native
Added: zc.zkzeo/trunk/src/zc/zkzeo/__init__.py
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/__init__.py (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/__init__.py 2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,25 @@
+##############################################################################
+#
+# Copyright (c) Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+def client(zookeeper_connection_string, path, *args, **kw):
+ import zc.zkzeo._client
+ return zc.zkzeo._client.client(zookeeper_connection_string, path,
+ *args, **kw)
+
+def DB(zookeeper_connection_string, path, *args, **kw):
+ import ZODB
+ return ZODB.DB(client(zookeeper_connection_string, path, *args, **kw))
+
+def connection(zookeeper_connection_string, path, *args, **kw):
+ return DB(zookeeper_connection_string, path, *args, **kw).open_once()
Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/__init__.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Added: zc.zkzeo/trunk/src/zc/zkzeo/_client.py
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/_client.py (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/_client.py 2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,90 @@
+##############################################################################
+#
+# Copyright (c) Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import time
+import zc.zk
+import ZEO.ClientStorage
+import threading
+
+def client(zk, path, *args, **kw):
+ zk = zc.zk.ZooKeeper(zk)
+ addresses = zk.children(path)
+ client = ZEO.ClientStorage.ClientStorage(
+ _wait_addresses(addresses, parse_addr),
+ *args, **kw)
+ return _client(addresses, client)
+
+def DB(*args, **kw):
+ import ZODB
+ return ZODB.DB(client(*args, **kw))
+
+def connection(*args, **kw):
+ return DB(*args, **kw).open_once()
+
+def parse_addr(addr):
+ host, port = addr.split(':')
+ return host, int(port)
+
+def _client(addresses, client):
+
+ new_addr = getattr(client, 'new_addr', None)
+ if new_addr is None:
+ # Pre 3.11 client. We need to make our own new_addr.
+ # This is ugly. Don't look. :(
+ def new_addr(addr):
+ manager = client._rpc_mgr
+ manager.addrlist = manager._parse_addrs(addr)
+ with manager.cond:
+ if manager.thread is not None:
+ manager.thread.addrlist = manager.addrlist
+
+ @addresses
+ def changed(addresses):
+ addrs = map(parse_addr, addresses)
+ if addrs:
+ new_addr(addrs)
+
+ client.zookeeper_addresses = addresses
+
+ return client
+
+def _wait_addresses(addresses, transform):
+ while 1:
+ result = [transform(addr) for addr in addresses]
+ if result:
+ return result
+ time.sleep(1)
+
+class ZConfig:
+
+ def __init__(self, config):
+ self.config = config
+ self.name = config.getSectionName()
+
+ def open(self):
+ import ZConfig.datatypes
+ import ZODB.config
+
+ zk = zc.zk.ZooKeeper(self.config.zookeeper)
+ paths = [server.address for server in self.config.server]
+ if len(paths) > 1:
+ raise TypeError("Only one server option is allowed")
+ path = paths[0]
+ if not isinstance(path, basestring) or not path[0] == '/':
+ raise TypeError("server must be a ZooKeeper path, %r" % path)
+ addresses = zk.children(path)
+ self.config.server = _wait_addresses(
+ addresses, ZConfig.datatypes.SocketAddress)
+
+ client = ZODB.config.ZEOClient(self.config).open()
+ return _client(addresses, client)
Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/_client.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Added: zc.zkzeo/trunk/src/zc/zkzeo/component.xml
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/component.xml (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/component.xml 2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,9 @@
+<component>
+ <sectiontype
+ name="zkzeoclient"
+ datatype="zc.zkzeo._client.ZConfig"
+ implements="ZODB.storage"
+ extends="zeoclient">
+ <key name="zookeeper" datatype="string" required="yes" />
+ </sectiontype>
+</component>
Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/component.xml
___________________________________________________________________
Added: svn:eol-style
+ native
Added: zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py 2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,146 @@
+import os
+import sys
+import threading
+import time
+import zc.thread
+import zc.zk
+import ZEO.runzeo
+
+class Options(ZEO.runzeo.ZEOOptions):
+
+ __doc__ = ZEO.runzeo.__doc__ + """
+
+ This command supports registering a server with ZooKeeper.
+ """
+ schemadir = os.path.dirname(__file__)
+
+ def __init__(self):
+ ZEO.runzeo.ZEOOptions.__init__(self)
+
+ self.add('zkconnection', 'zookeeper.connection')
+ self.add('zkpath', 'zookeeper.path')
+
+class ZKServer(ZEO.runzeo.ZEOServer):
+
+ __zk = __testing = __using_dynamic_port = None
+ def create_server(self):
+ ZEO.runzeo.ZEOServer.create_server(self)
+ if not self.options.zkpath:
+ return
+ addr = self.server.dispatcher.socket.getsockname()
+ if self.__using_dynamic_port:
+ self.__zk = zc.zk.ZooKeeper(self.options.zkconnection, timeout=9)
+ if self.__zk.handle is None:
+ raise SystemError("Couldn;'t connect to ZooKeeper at %r"
+ % self.options.zkconnection)
+
+ @zc.thread.Thread
+ def register_w_zk():
+ if self.__zk is None:
+ self.__zk = zc.zk.ZooKeeper(self.options.zkconnection)
+ while self.__zk.handle is None:
+ time.sleep(.1)
+ self.__zk.register_server(self.options.zkpath, addr)
+ if self.__testing is not None:
+ self.__testing()
+
+ def clear_socket(self):
+ if self.__zk is not None:
+ self.__zk.close()
+ ZEO.runzeo.ZEOServer.clear_socket(self)
+
+ def check_socket(self):
+ if self.options.address[1] == None:
+ self.options.address = self.options.address[0], 0
+ self.__using_dynamic_port = True
+ return
+ ZEO.runzeo.ZEOServer.check_socket(self)
+
+ def setup_signals(self):
+ if self.__testing is not None:
+ return
+ ZEO.runzeo.ZEOServer.setup_signals(self)
+
+ def setup_default_logging(self):
+ if self.__testing is not None:
+ return
+ ZEO.runzeo.ZEOServer.setup_default_logging(self)
+
+
+def main(args=None, testing=None):
+ if args is None:
+ args = sys.argv[1:]
+ options = Options()
+ options.realize(args)
+ s = ZKServer(options)
+ s._ZKServer__testing = testing
+ if testing is not None:
+ return s
+ s.main()
+
+def close311(self): # based server close method in 3.11
+
+ # Stop accepting connections
+ self.dispatcher.close()
+ if self.monitor is not None:
+ self.monitor.close()
+
+ # Close open client connections
+ for sid, connections in self.connections.items():
+ for conn in connections[:]:
+ try:
+ conn.connection.close()
+ except:
+ pass
+
+ for name, storage in self.storages.iteritems():
+ storage.close()
+
+def test(config, storage=None, zookeeper='127.0.0.1:2181'):
+ """Run a server in a thread, mainly for testing.
+ """
+ import tempfile
+
+ if '\n' not in config:
+ # It's just a path
+ if storage is None:
+ storage = '<mappingstorage>\n</mappingstorage>'
+ elif storage.endswith('.fs'):
+ storage = '<filestorage>\npath %s\n</filestorage>' % storage
+ config = """
+ <zeo>
+ address 127.0.0.1
+ </zeo>
+ <zookeeper>
+ connection %s
+ path %s
+ </zookeeper>
+ %s
+ """ % (zookeeper, config, storage)
+
+ fd, confpath = tempfile.mkstemp()
+ os.write(fd, config)
+ os.close(fd)
+ event = threading.Event()
+ server = main(['-C', confpath], event.set)
+ os.remove(confpath)
+
+ @zc.thread.Thread
+ def run_zeo_server_for_testing():
+ try:
+ server.main()
+ except:
+ import logging
+ logging.getLogger(__name__).exception('wtf')
+
+ def stop():
+ close = getattr(server.server, 'close', None)
+ if close is None:
+ close311(server.server)
+ else:
+ close()
+ run_zeo_server_for_testing.join(1)
+ return run_zeo_server_for_testing
+
+ event.wait(1)
+ return stop
Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Added: zc.zkzeo/trunk/src/zc/zkzeo/schema.xml
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/schema.xml (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/schema.xml 2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,11 @@
+<schema extends="package:ZEO:schema.xml">
+ <description>
+ This schema describes the configuration of the ZEO storage server
+ process that registers itself with ZooKeeper.
+ </description>
+
+ <import package="zc.zkzeo" file="server-component.xml" />
+
+ <section type="zookeeper" name="*" required="no" attribute="zookeeper" />
+
+</schema>
Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/schema.xml
___________________________________________________________________
Added: svn:eol-style
+ native
Added: zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml 2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,25 @@
+<component>
+
+ <sectiontype name="zookeeper">
+
+ <description>
+ Specify how to register a ZEO server with ZooKeeper
+ </description>
+
+ <key name="connection" datatype="string" required="no"
+ default="127.0.0.1:2181"
+ >
+ <description>
+ ZooKeeper connection string.
+ </description>
+ </key>
+
+ <key name="path" datatype="string" required="yes">
+ <description>
+ The path to register the
+ </description>
+ </key>
+
+ </sectiontype>
+
+</component>
Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml
___________________________________________________________________
Added: svn:eol-style
+ native
Added: zc.zkzeo/trunk/src/zc/zkzeo/tests.py
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/tests.py (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/tests.py 2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,60 @@
+##############################################################################
+#
+# Copyright (c) Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import doctest
+import unittest
+import manuel.capture
+import manuel.doctest
+import manuel.testing
+import mock
+import re
+import ZEO.zrpc.connection
+import zc.zk.testing
+import zope.testing.setupstack
+import zope.testing.renormalizing
+
+def setUp(test):
+ zc.zk.testing.setUp(test, tree='/databases\n /demo\n')
+ test.globs['_server_loop'] = ZEO.zrpc.connection.server_loop
+
+ # The original server loop spews thread exceptions during shutdowm.
+ # This version doesn't.
+ def server_loop(map):
+ try:
+ test.globs['_server_loop'](map)
+ except Exception:
+ if len(map) > 1:
+ raise
+
+ ZEO.zrpc.connection.server_loop = server_loop
+
+def tearDown(test):
+ zc.zk.testing.tearDown(test)
+ ZEO.zrpc.connection.server_loop = test.globs['_server_loop']
+
+def test_suite():
+ return unittest.TestSuite((
+ # doctest.DocFileSuite('README.test'),
+ # doctest.DocTestSuite(),
+ manuel.testing.TestSuite(
+ manuel.doctest.Manuel(
+ checker = zope.testing.renormalizing.RENormalizing([
+ (re.compile(r'pid = \d+'), 'pid = PID'),
+ (re.compile(r'/127.0.0.1:\d+'), '/127.0.0.1:PORT'),
+ ])
+ ) + manuel.capture.Manuel(),
+ 'README.txt',
+ setUp=setUp, tearDown=zc.zk.testing.tearDown,
+ ),
+ ))
+
Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/tests.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
More information about the checkins
mailing list