[Checkins] SVN: zc.zkzeo/trunk/ Coordinate ZEO clients and servers using ZooKeeper

Jim Fulton jim at zope.com
Tue Dec 6 22:59:17 UTC 2011


Log message for revision 123615:
  Coordinate ZEO clients and servers using ZooKeeper

Changed:
  U   zc.zkzeo/trunk/buildout.cfg
  U   zc.zkzeo/trunk/setup.py
  A   zc.zkzeo/trunk/src/zc/zkzeo/
  A   zc.zkzeo/trunk/src/zc/zkzeo/README.txt
  A   zc.zkzeo/trunk/src/zc/zkzeo/__init__.py
  A   zc.zkzeo/trunk/src/zc/zkzeo/_client.py
  A   zc.zkzeo/trunk/src/zc/zkzeo/component.xml
  A   zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py
  A   zc.zkzeo/trunk/src/zc/zkzeo/schema.xml
  A   zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml
  A   zc.zkzeo/trunk/src/zc/zkzeo/tests.py

-=-
Modified: zc.zkzeo/trunk/buildout.cfg
===================================================================
--- zc.zkzeo/trunk/buildout.cfg	2011-12-06 22:56:59 UTC (rev 123614)
+++ zc.zkzeo/trunk/buildout.cfg	2011-12-06 22:59:17 UTC (rev 123615)
@@ -1,12 +1,12 @@
 [buildout]
-develop = .
-parts = test py
+develop = . zk
+parts = test
 
-[test]
-recipe = zc.recipe.testrunner
-eggs = 
-
 [py]
 recipe = zc.recipe.egg
-eggs = ${test:eggs}
+eggs = zc.zkzeo [test]
 interpreter = py
+
+[test]
+recipe = zc.recipe.testrunner
+eggs = ${py:eggs}

Modified: zc.zkzeo/trunk/setup.py
===================================================================
--- zc.zkzeo/trunk/setup.py	2011-12-06 22:56:59 UTC (rev 123614)
+++ zc.zkzeo/trunk/setup.py	2011-12-06 22:59:17 UTC (rev 123615)
@@ -11,12 +11,17 @@
 # FOR A PARTICULAR PURPOSE.
 #
 ##############################################################################
-name, version = 'zc.', '0'
+name, version = 'zc.zkzeo', '0'
 
-install_requires = ['setuptools']
-extras_require = dict(test=['zope.testing'])
+install_requires = [
+    'setuptools', 'zc.zk [static]', 'ZODB3', 'zc.thread']
+extras_require = dict(
+    test=['zope.testing', 'zc.zk [static,test]', 'manuel'],
+    )
 
 entry_points = """
+[console_scripts]
+zkrunzeo = zc.zkzeo.runzeo:main
 """
 
 from setuptools import setup

Added: zc.zkzeo/trunk/src/zc/zkzeo/README.txt
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/README.txt	                        (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/README.txt	2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,179 @@
+=============
+ZEO ZooKeeper
+=============
+
+Managing addresses, and especially ports is a drag.  ZooKeeper can be
+used as a service registry.  Servers can register themselves and
+clients can find services there.  The ``zc.zk`` package provides
+support for registering ZEO servers and a ZEO client storage that gets
+addresses from ZooKeeper.
+
+.. contents::
+
+Running ZEO servers
+===================
+
+To run a ZEO server, and register it with ZooKeeper, first create a
+ZEO configuration file::
+
+   <zeo>
+      address 127.0.0.1
+   </zeo>
+
+   <zookeeper>
+      connection zookeeper.example.com:2181
+      path /databases/demo
+   </zookeeper>
+
+   <filestorage>
+      path demo.fs
+   </filestorage>
+
+.. -> server_conf
+
+The ZEO configuration file has the same options as usual, plus a
+``zookeeper`` section with two options:
+
+``connection``
+   A ZooKeeper connection string.  This is typically a list of
+   *HOST:PORT* pairs separated by commas.
+
+``path``
+   The path at which to register the server.  The path must already
+   exist.  When the server starts, it will register itself by creating
+   a subnode of the path with a name consisting of it's address.
+
+When specifying the ZEO address, you can leave of the port and the
+operating system will assign it for you.
+
+To start the server, use the ``zkrunzeo`` script::
+
+  $ bin/zkrunzeo -C FILENAME
+
+.. test
+
+    >>> import zc.zkzeo.runzeo, zc.zk
+    >>> stop = zc.zkzeo.runzeo.test(
+    ...     server_conf, zookeeper='zookeeper.example.com:2181')
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+    >>> print zk.export_tree('/databases/demo', ephemeral=True),
+    /demo
+      /127.0.0.1:56824
+        pid = 88841
+
+where ``FILENAME`` is the name of the configuration file you created.
+
+
+Defining ZEO clients
+====================
+
+You can define a client in 2 ways, from Python and using a
+configuration file.
+
+Defining ZEO clients with Python
+--------------------------------
+
+From Python, use ``zc.zkzeo.client``::
+
+    >>> import zc.zkzeo
+    >>> client = zc.zkzeo.client(
+    ...     'zookeeper.example.com:2181', '/databases/demo',
+    ...     read_only=True)
+
+You pass a ZooKeeper connection string and a path.  The ``Client``
+constructor will create a client storage with addresses found as
+sub-nodes of the given path and it will adjust the client-storage
+addresses as nodes are added and removed as children of the path.
+
+You can pass all other ``ZEO.ClientStorage.ClientStorage`` arguments,
+except the address, as additional positional and keyword arguments.
+
+
+Defining ZEO clients in configuration files
+-------------------------------------------
+
+In configuration files, use a ``zkzeoclient`` storage
+section::
+
+    %import zc.zkzeo
+
+    <zodb>
+       <zkzeoclient>
+          zookeeper zookeeper.example.com:2181
+          server /databases/demo
+          read-only true
+       </zkzeoclient>
+    </zodb>
+
+.. -> conf
+
+The options for ``zkzeoclient`` are the same as for the standard ZODB
+``zeoclient`` section, except:
+
+- There's an extra required ``zookeeper`` option used to provide a
+  ZooKeeper connection string.
+
+- There can be only one ``server`` option and it is used to supply the
+  path in ZooKeeper where addresses may be found.
+
+.. test
+
+  Double check the clients are working by opening a writable
+  connection and maing sure we see changes:
+
+    >>> writable_db = zc.zkzeo.DB('zookeeper.example.com:2181',
+    ...                           '/databases/demo')
+    >>> with writable_db.transaction() as conn:
+    ...     conn.root.x = 1
+
+    >>> import ZODB.config
+    >>> db_from_config = ZODB.config.databaseFromString(conf)
+    >>> with db_from_config.transaction() as conn:
+    ...     print conn.root()
+    {'x': 1}
+
+    >>> import ZODB
+    >>> db_from_py = ZODB.DB(client)
+    >>> with db_from_py.transaction() as conn:
+    ...     print conn.root()
+    {'x': 1}
+
+  Restart the storage server and make sure clients reconnect:
+
+    >>> [old_addr] = zk.get_children('/databases/demo')
+    >>> stop().exception
+
+    >>> wait_until(lambda : not client.is_connected())
+    >>> wait_until(lambda : not writable_db.storage.is_connected())
+    >>> wait_until(lambda : not db_from_config.storage.is_connected())
+
+    >>> stop = zc.zkzeo.runzeo.test(
+    ...     server_conf, zookeeper='zookeeper.example.com:2181')
+
+    >>> [addr] = zk.get_children('/databases/demo')
+    >>> addr != old_addr
+    True
+    >>> print zk.export_tree('/databases/demo', ephemeral=True),
+    /demo
+      /127.0.0.1:56837
+        pid = 88841
+
+
+    >>> wait_until(writable_db.storage.is_connected)
+    >>> with writable_db.transaction() as conn:
+    ...     conn.root.x = 2
+
+    >>> wait_until(db_from_config.storage.is_connected)
+    >>> with db_from_config.transaction() as conn:
+    ...     print conn.root()
+    {'x': 2}
+    >>> wait_until(client.is_connected)
+    >>> with db_from_py.transaction() as conn:
+    ...     print conn.root()
+    {'x': 2}
+
+    >>> db_from_py.close()
+    >>> db_from_config.close()
+    >>> writable_db.close()
+    >>> stop().exception
+


Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/README.txt
___________________________________________________________________
Added: svn:eol-style
   + native

Added: zc.zkzeo/trunk/src/zc/zkzeo/__init__.py
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/__init__.py	                        (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/__init__.py	2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,25 @@
+##############################################################################
+#
+# Copyright (c) Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+def client(zookeeper_connection_string, path, *args, **kw):
+    import zc.zkzeo._client
+    return zc.zkzeo._client.client(zookeeper_connection_string, path,
+                                   *args, **kw)
+
+def DB(zookeeper_connection_string, path, *args, **kw):
+    import ZODB
+    return ZODB.DB(client(zookeeper_connection_string, path, *args, **kw))
+
+def connection(zookeeper_connection_string, path, *args, **kw):
+    return DB(zookeeper_connection_string, path, *args, **kw).open_once()


Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/__init__.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Added: zc.zkzeo/trunk/src/zc/zkzeo/_client.py
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/_client.py	                        (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/_client.py	2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,90 @@
+##############################################################################
+#
+# Copyright (c) Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import time
+import zc.zk
+import ZEO.ClientStorage
+import threading
+
+def client(zk, path, *args, **kw):
+    zk = zc.zk.ZooKeeper(zk)
+    addresses = zk.children(path)
+    client = ZEO.ClientStorage.ClientStorage(
+        _wait_addresses(addresses, parse_addr),
+        *args, **kw)
+    return _client(addresses, client)
+
+def DB(*args, **kw):
+    import ZODB
+    return ZODB.DB(client(*args, **kw))
+
+def connection(*args, **kw):
+    return DB(*args, **kw).open_once()
+
+def parse_addr(addr):
+    host, port = addr.split(':')
+    return host, int(port)
+
+def _client(addresses, client):
+
+    new_addr = getattr(client, 'new_addr', None)
+    if new_addr is None:
+        # Pre 3.11 client.  We need to make our own new_addr.
+        # This is ugly. Don't look. :(
+        def new_addr(addr):
+            manager = client._rpc_mgr
+            manager.addrlist = manager._parse_addrs(addr)
+            with manager.cond:
+                if manager.thread is not None:
+                    manager.thread.addrlist = manager.addrlist
+
+    @addresses
+    def changed(addresses):
+        addrs = map(parse_addr, addresses)
+        if addrs:
+            new_addr(addrs)
+
+    client.zookeeper_addresses = addresses
+
+    return client
+
+def _wait_addresses(addresses, transform):
+    while 1:
+        result = [transform(addr) for addr in addresses]
+        if result:
+            return result
+        time.sleep(1)
+
+class ZConfig:
+
+    def __init__(self, config):
+        self.config = config
+        self.name = config.getSectionName()
+
+    def open(self):
+        import ZConfig.datatypes
+        import ZODB.config
+
+        zk = zc.zk.ZooKeeper(self.config.zookeeper)
+        paths = [server.address for server in self.config.server]
+        if len(paths) > 1:
+            raise TypeError("Only one server option is allowed")
+        path = paths[0]
+        if not isinstance(path, basestring) or not path[0] == '/':
+            raise TypeError("server must be a ZooKeeper path, %r" % path)
+        addresses = zk.children(path)
+        self.config.server = _wait_addresses(
+            addresses, ZConfig.datatypes.SocketAddress)
+
+        client = ZODB.config.ZEOClient(self.config).open()
+        return _client(addresses, client)


Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/_client.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Added: zc.zkzeo/trunk/src/zc/zkzeo/component.xml
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/component.xml	                        (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/component.xml	2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,9 @@
+<component>
+  <sectiontype
+      name="zkzeoclient"
+      datatype="zc.zkzeo._client.ZConfig"
+      implements="ZODB.storage"
+      extends="zeoclient">
+    <key name="zookeeper" datatype="string" required="yes" />
+  </sectiontype>
+</component>


Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/component.xml
___________________________________________________________________
Added: svn:eol-style
   + native

Added: zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py	                        (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py	2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,146 @@
+import os
+import sys
+import threading
+import time
+import zc.thread
+import zc.zk
+import ZEO.runzeo
+
+class Options(ZEO.runzeo.ZEOOptions):
+
+    __doc__ = ZEO.runzeo.__doc__ + """
+
+    This command supports registering a server with ZooKeeper.
+    """
+    schemadir = os.path.dirname(__file__)
+
+    def __init__(self):
+        ZEO.runzeo.ZEOOptions.__init__(self)
+
+        self.add('zkconnection', 'zookeeper.connection')
+        self.add('zkpath', 'zookeeper.path')
+
+class ZKServer(ZEO.runzeo.ZEOServer):
+
+    __zk = __testing = __using_dynamic_port = None
+    def create_server(self):
+        ZEO.runzeo.ZEOServer.create_server(self)
+        if not self.options.zkpath:
+            return
+        addr = self.server.dispatcher.socket.getsockname()
+        if self.__using_dynamic_port:
+            self.__zk = zc.zk.ZooKeeper(self.options.zkconnection, timeout=9)
+            if self.__zk.handle is None:
+                raise SystemError("Couldn;'t connect to ZooKeeper at %r"
+                                  % self.options.zkconnection)
+
+        @zc.thread.Thread
+        def register_w_zk():
+            if self.__zk is None:
+                self.__zk = zc.zk.ZooKeeper(self.options.zkconnection)
+            while self.__zk.handle is None:
+                time.sleep(.1)
+            self.__zk.register_server(self.options.zkpath, addr)
+            if self.__testing is not None:
+                self.__testing()
+
+    def clear_socket(self):
+        if self.__zk is not None:
+            self.__zk.close()
+        ZEO.runzeo.ZEOServer.clear_socket(self)
+
+    def check_socket(self):
+        if self.options.address[1] == None:
+            self.options.address = self.options.address[0], 0
+            self.__using_dynamic_port = True
+            return
+        ZEO.runzeo.ZEOServer.check_socket(self)
+
+    def setup_signals(self):
+        if self.__testing is not None:
+            return
+        ZEO.runzeo.ZEOServer.setup_signals(self)
+
+    def setup_default_logging(self):
+        if self.__testing is not None:
+            return
+        ZEO.runzeo.ZEOServer.setup_default_logging(self)
+
+
+def main(args=None, testing=None):
+    if args is None:
+        args = sys.argv[1:]
+    options = Options()
+    options.realize(args)
+    s = ZKServer(options)
+    s._ZKServer__testing = testing
+    if testing is not None:
+        return s
+    s.main()
+
+def close311(self): # based server close method in 3.11
+
+    # Stop accepting connections
+    self.dispatcher.close()
+    if self.monitor is not None:
+        self.monitor.close()
+
+    # Close open client connections
+    for sid, connections in self.connections.items():
+        for conn in connections[:]:
+            try:
+                conn.connection.close()
+            except:
+                pass
+
+    for name, storage in self.storages.iteritems():
+        storage.close()
+
+def test(config, storage=None, zookeeper='127.0.0.1:2181'):
+    """Run a server in a thread, mainly for testing.
+    """
+    import tempfile
+
+    if '\n' not in config:
+        # It's just a path
+        if storage is None:
+            storage = '<mappingstorage>\n</mappingstorage>'
+        elif storage.endswith('.fs'):
+            storage = '<filestorage>\npath %s\n</filestorage>' % storage
+        config = """
+        <zeo>
+          address 127.0.0.1
+        </zeo>
+        <zookeeper>
+          connection %s
+          path %s
+        </zookeeper>
+        %s
+        """ % (zookeeper, config, storage)
+
+    fd, confpath = tempfile.mkstemp()
+    os.write(fd, config)
+    os.close(fd)
+    event = threading.Event()
+    server = main(['-C', confpath], event.set)
+    os.remove(confpath)
+
+    @zc.thread.Thread
+    def run_zeo_server_for_testing():
+        try:
+            server.main()
+        except:
+            import logging
+            logging.getLogger(__name__).exception('wtf')
+
+    def stop():
+        close = getattr(server.server, 'close', None)
+        if close is None:
+            close311(server.server)
+        else:
+            close()
+        run_zeo_server_for_testing.join(1)
+        return run_zeo_server_for_testing
+
+    event.wait(1)
+    return stop


Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/runzeo.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native

Added: zc.zkzeo/trunk/src/zc/zkzeo/schema.xml
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/schema.xml	                        (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/schema.xml	2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,11 @@
+<schema extends="package:ZEO:schema.xml">
+  <description>
+    This schema describes the configuration of the ZEO storage server
+    process that registers itself with ZooKeeper.
+  </description>
+
+  <import package="zc.zkzeo" file="server-component.xml" />
+
+  <section type="zookeeper" name="*" required="no" attribute="zookeeper" />
+
+</schema>


Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/schema.xml
___________________________________________________________________
Added: svn:eol-style
   + native

Added: zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml	                        (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml	2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,25 @@
+<component>
+
+  <sectiontype name="zookeeper">
+
+    <description>
+      Specify how to register a ZEO server with ZooKeeper
+    </description>
+
+    <key name="connection" datatype="string" required="no"
+         default="127.0.0.1:2181"
+         >
+      <description>
+        ZooKeeper connection string.
+      </description>
+    </key>
+
+    <key name="path" datatype="string" required="yes">
+      <description>
+        The path to register the 
+      </description>
+    </key>
+
+  </sectiontype>
+
+</component>


Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/server-component.xml
___________________________________________________________________
Added: svn:eol-style
   + native

Added: zc.zkzeo/trunk/src/zc/zkzeo/tests.py
===================================================================
--- zc.zkzeo/trunk/src/zc/zkzeo/tests.py	                        (rev 0)
+++ zc.zkzeo/trunk/src/zc/zkzeo/tests.py	2011-12-06 22:59:17 UTC (rev 123615)
@@ -0,0 +1,60 @@
+##############################################################################
+#
+# Copyright (c) Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+import doctest
+import unittest
+import manuel.capture
+import manuel.doctest
+import manuel.testing
+import mock
+import re
+import ZEO.zrpc.connection
+import zc.zk.testing
+import zope.testing.setupstack
+import zope.testing.renormalizing
+
+def setUp(test):
+    zc.zk.testing.setUp(test, tree='/databases\n  /demo\n')
+    test.globs['_server_loop'] = ZEO.zrpc.connection.server_loop
+
+    # The original server loop spews thread exceptions during shutdowm.
+    # This version doesn't.
+    def server_loop(map):
+        try:
+            test.globs['_server_loop'](map)
+        except Exception:
+            if len(map) > 1:
+                raise
+
+    ZEO.zrpc.connection.server_loop = server_loop
+
+def tearDown(test):
+    zc.zk.testing.tearDown(test)
+    ZEO.zrpc.connection.server_loop = test.globs['_server_loop']
+
+def test_suite():
+    return unittest.TestSuite((
+        # doctest.DocFileSuite('README.test'),
+        # doctest.DocTestSuite(),
+        manuel.testing.TestSuite(
+            manuel.doctest.Manuel(
+                checker = zope.testing.renormalizing.RENormalizing([
+                    (re.compile(r'pid = \d+'), 'pid = PID'),
+                    (re.compile(r'/127.0.0.1:\d+'), '/127.0.0.1:PORT'),
+                    ])
+                ) + manuel.capture.Manuel(),
+            'README.txt',
+            setUp=setUp, tearDown=zc.zk.testing.tearDown,
+            ),
+        ))
+


Property changes on: zc.zkzeo/trunk/src/zc/zkzeo/tests.py
___________________________________________________________________
Added: svn:keywords
   + Id
Added: svn:eol-style
   + native



More information about the checkins mailing list