[Checkins] SVN: zc.zk/trunk/src/zc/zk/ Refactored to cleanup unused Children and Properties objects.

Jim Fulton jim at zope.com
Mon Dec 5 20:00:21 UTC 2011


Log message for revision 123575:
  Refactored to cleanup unused Children and Properties objects.
  
  Added ln and get_properties methods.
  
  Added docs/tests for readjustment of Children and Properties objects
  that fell out of the refactoring.
  
  Various other cleanups and changes along the way.
  
  Added timouts to avoid waiting forever when disconnected.
  

Changed:
  U   zc.zk/trunk/src/zc/zk/README.txt
  U   zc.zk/trunk/src/zc/zk/__init__.py
  U   zc.zk/trunk/src/zc/zk/tests.py

-=-
Modified: zc.zk/trunk/src/zc/zk/README.txt
===================================================================
--- zc.zk/trunk/src/zc/zk/README.txt	2011-12-05 19:59:35 UTC (rev 123574)
+++ zc.zk/trunk/src/zc/zk/README.txt	2011-12-05 20:00:20 UTC (rev 123575)
@@ -4,17 +4,18 @@
 The zc.zk package provides some high-level interfaces to the low-level
 zookeeper extension.  It's not complete, in that it doesn't try, at
 this time, to be a complete high-level interface. Rather, it provides
-facilities we need to use Zookeeeper to services together:
+facilities we need to use ZooKeeper to connect services:
 
 - ZODB database clients and servers
 - HTTP-based clients and services
-- Load balencers
+- Load balancers and HTTP application servers
 
 The current (initial) use cases are:
 
 - Register a server providing a service.
 - Get the addresses of servers providing a service.
-- Get abd set service configuration data.
+- Get and set service configuration data.
+- Model system architecture as a tree.
 
 This package makes no effort to support Windows.  (Patches to support
 Windows might be accepted if they don't add much complexity.)
@@ -30,7 +31,7 @@
 different ways, it isn't listed as a distribution requirement.
 
 An easy way to get the Python zookeeper binding is by installing
-``zc-zookeeper-static``, whch is a self-contained statically building
+``zc-zookeeper-static``, which is a self-contained statically built
 distribution.
 
 Instantiating a ZooKeeper helper
@@ -42,8 +43,8 @@
     >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
 
 The ZooKeeper constructor takes a ZooKeeper connection string, which is a
-comma-separated list of addresses of the form HOST:PORT.  It defaults
-to '127.0.0.1:2181', which is convenient during development.
+comma-separated list of addresses of the form *HOST:PORT*.  It defaults
+to ``'127.0.0.1:2181'``, which is convenient during development.
 
 Register a server providing a service.
 --------------------------------------
@@ -63,23 +64,24 @@
 
 
 ``register_server`` creates a read-only ephemeral ZooKeeper node as a
-child of the given service path.  The name of the new node is the
-given address. This allows clients to get the list of addresses by
-just getting the list of the names of children of the service path.
+child of the given service path.  The name of the new node is (a
+string representation of) the given address. This allows clients to
+get the list of addresses by just getting the list of the names of
+children of the service path.
 
 Ephemeral nodes have the useful property that they're automatically
 removed when a ZooKeeper session is closed or when the process
-containing it dies.  De-deregistration is automatic.
+containing it dies.  De-registration is automatic.
 
 When registering a server, you can optionally provide server (node)
 data as additional keyword arguments to register_server.  By default,
-the process id is set as the ``pid`` server key.  This is useful to
+the process id is set as the ``pid`` property.  This is useful to
 tracking down the server process.
 
 Get the addresses of service providers.
 ---------------------------------------
 
-Getting the adresses providing a service is accomplished by getting the
+Getting the addresses providing a service is accomplished by getting the
 children of a service node::
 
     >>> addresses = zk.children('/fooservice/providers')
@@ -95,7 +97,7 @@
     ['192.168.0.42:8080', '192.168.0.42:8081']
 
 You can call the iterable with a callback function that is called
-whenenever the list of children changes::
+whenever the list of children changes::
 
     >>> @zk.children('/fooservice/providers')
     ... def addresses_updated(addresses):
@@ -114,8 +116,8 @@
 Get service configuration data.
 -------------------------------
 
-You get service configuration data by getting data associated with a
-ZooKeeper node.  The interface for getting data is similar to the
+You get service configuration data by getting properties associated with a
+ZooKeeper node.  The interface for getting properties is similar to the
 interface for getting children::
 
     >>> data = zk.properties('/fooservice')
@@ -144,10 +146,10 @@
 The callback is called immediately. It'll also be called when data are
 updated.
 
-Updating node data
-------------------
+Updating node properties
+------------------------
 
-You can't set data properties, but you can update data by calling it's
+You can't set properties, but you can update properties by calling the
 ``update`` method::
 
     >>> thread_info = {'threads': 2}
@@ -158,14 +160,14 @@
     secret: u'123'
     threads: 2
 
-or by calling it's ``set`` method, which removes keys not listed::
+or by calling the ``set`` method, which removes keys not listed::
 
     >>> data.set(threads= 3, secret='1234')
     data updated
     secret: u'1234'
     threads: 3
 
-Both update and set can take data from a positional data argument, or
+Both ``update`` and ``set`` can take data from a positional data argument, or
 from keyword parameters.  Keyword parameters take precedent over the
 positional data argument.
 
@@ -191,7 +193,7 @@
 representation. You can then populate the tree by importing the
 representation.  Heres an example::
 
-  /lb
+  /lb : ipvs
     /pools
       /cms
         # The address is fixed because it's
@@ -200,16 +202,16 @@
         providers -> /cms/providers
       /retail
         address = '1.2.3.5:80'
-        providers -> /cms/retail
+        providers -> /cms/providers
 
-  /cms
+  /cms : z4m cms
     threads = 3
     /providers
     /databases
       /main
         /providers
 
-  /retail
+  /retail : z4m retail
     threads = 1
     /providers
     /databases
@@ -220,22 +222,29 @@
 .. -> tree_text
 
 This example defines a tree with 3 top nodes, ``lb`` and ``cms``, and
-``retail``.  The ``retail`` node has two subnodes, ``providers`` and
-``databases`` and a property ``threads``.  The ``/retail/databases``
-node has symbolic link, ``main`` and a ``ugc`` subnode.  The symbolic
-link is implemented as a property named ``main ->``.  We'll say more
-about symbolic links in a later section.
+``retail``.  The ``retail`` node has two sub-nodes, ``providers`` and
+``databases`` and a property ``threads``.
 
+The ``/retail/databases`` node has symbolic link, ``main`` and a
+``ugc`` sub-node.  The symbolic link is implemented as a property named
+`` We'll say more about symbolic links in a later section.
+
+The ``lb``, ``cms`` and ``retail`` nodes have *types*.  A type is
+indicated by following a node name with a colon and a string value.
+The string value is used to populate a ``type`` property.  Types are
+useful to document the kinds of services provided at a node and can be
+used by deployment tools to deploy service providers.
+
 You can import a tree definition with the ``import_tree`` method:
 
     >>> zk.import_tree(tree_text)
 
-This imports the tree at the top pf the ZooKeeper tree.
+This imports the tree at the top of the ZooKeeper tree.
 
 We can also export a ZooKeeper tree:
 
     >>> print zk.export_tree(),
-    /cms
+    /cms : z4m cms
       threads = 3
       /databases
         /main
@@ -245,15 +254,15 @@
       secret = u'1234'
       threads = 3
       /providers
-    /lb
+    /lb : ipvs
       /pools
         /cms
           address = u'1.2.3.4:80'
           providers -> /cms/providers
         /retail
           address = u'1.2.3.5:80'
-          providers -> /cms/retail
-    /retail
+          providers -> /cms/providers
+    /retail : z4m retail
       threads = 1
       /databases
         main -> /cms/databases/main
@@ -263,10 +272,10 @@
 
 Note that when we export a tree:
 
-- The special reserverd top-level zookeeper node is omitted.
-- Ephemeral nodes are ommitted.
+- The special reserved top-level zookeeper node is omitted.
+- Ephemeral nodes are omitted.
 - Each node's information is sorted by type (properties, then links,
-- then subnodes) and then by name,
+- then sub-nodes) and then by name,
 
 You can export just a portion of a tree:
 
@@ -293,7 +302,7 @@
 We can import a tree over an existing tree and changes will be
 applied.  Let's update our textual description::
 
-  /lb
+  /lb : ipvs
     /pools
       /cms
         # The address is fixed because it's
@@ -301,7 +310,7 @@
         address = '1.2.3.4:80'
         providers -> /cms/providers
 
-  /cms
+  /cms : z4m cms
     threads = 4
     /providers
     /databases
@@ -310,7 +319,7 @@
 
 .. -> tree_text
 
-and reimport::
+and re-import::
 
     >>> zk.import_tree(tree_text)
     extra path not trimmed: /lb/pools/retail
@@ -319,7 +328,7 @@
 this if we export the tree:
 
     >>> print zk.export_tree(),
-    /cms
+    /cms : z4m cms
       threads = 4
       /databases
         /main
@@ -329,15 +338,15 @@
       secret = u'1234'
       threads = 3
       /providers
-    /lb
+    /lb : ipvs
       /pools
         /cms
           address = u'1.2.3.4:80'
           providers -> /cms/providers
         /retail
           address = u'1.2.3.5:80'
-          providers -> /cms/retail
-    /retail
+          providers -> /cms/providers
+    /retail : z4m retail
       threads = 1
       /databases
         main -> /cms/databases/main
@@ -356,7 +365,7 @@
 
     >>> zk.import_tree(tree_text, trim=True)
     >>> print zk.export_tree(),
-    /cms
+    /cms : z4m cms
       threads = 4
       /databases
         /main
@@ -366,12 +375,12 @@
       secret = u'1234'
       threads = 3
       /providers
-    /lb
+    /lb : ipvs
       /pools
         /cms
           address = u'1.2.3.4:80'
           providers -> /cms/providers
-    /retail
+    /retail : z4m retail
       threads = 1
       /databases
         main -> /cms/databases/main
@@ -384,7 +393,7 @@
 automatically trimmed.  So we weren't warned about the unreferenced
 top-level nodes in the import.
 
-Recursice Deletion
+Recursive Deletion
 ------------------
 
 ZooKeeper only allows deletion of nodes without children.
@@ -396,7 +405,7 @@
 
     >>> zk.delete_recursive('/retail')
     >>> print zk.export_tree(),
-    /cms
+    /cms : z4m cms
       threads = 4
       /databases
         /main
@@ -406,7 +415,7 @@
       secret = u'1234'
       threads = 3
       /providers
-    /lb
+    /lb : ipvs
       /pools
         /cms
           address = u'1.2.3.4:80'
@@ -426,7 +435,7 @@
 
 ZooKeeper doesn't have a concept of symbolic links, but ``zc.zk``
 provides a convention for dealing with symbolic links.  When trying to
-resolve a path, if a node lacks a child, but hase a property with a
+resolve a path, if a node lacks a child, but have a property with a
 name ending in ``' ->'``, the child will be found by following the
 path in the property value.
 
@@ -443,11 +452,108 @@
     u'/cms/providers/1.2.3.4:5'
 
 Note a limitation of symbolic links is that they can be hidden by
-children.  For example, if we added a real node, at ``/lb/pools``
+children.  For example, if we added a real node, at
+``/lb/pools/cms/provioders``, it would shadow the link.
 
 ``children``, ``properties``, and ``register_server`` will
 automatically use ``resolve`` to resolve paths.
 
+When the ``children`` and ``properties`` are used for a node, the
+paths they use will be adjusted dynamically when paths are removed.
+To illustrate this, let's get children of ``/cms/databases/main``::
+
+    >>> main_children = zk.children('/cms/databases/main')
+    >>> main_children.path
+    '/cms/databases/main'
+    >>> main_children.real_path
+    '/cms/databases/main'
+
+.. test
+
+    >>> main_properties = zk.properties('/cms/databases/main')
+    >>> main_properties.path
+    '/cms/databases/main'
+    >>> main_properties.real_path
+    '/cms/databases/main'
+
+``Children`` and ``Properties`` objects have a ``path`` attribute that
+has the value passed to the ``children`` or ``properties``
+methods. They have a ``real_path`` attribute that contains the path
+after resolving symbolic links.  Let's suppose we want to move the
+database node to '/databases/cms'.  First we'll export it::
+
+    >>> export = zk.export_tree('/cms/databases/main', name='cms')
+    >>> print export,
+    /cms
+      /providers
+
+Note that we used the export ``name`` option to specify a new name for
+the exported tree.
+
+Now, we'll create a databases node::
+
+    >>> zk.create('/databases', '', zc.zk.OPEN_ACL_UNSAFE)
+    '/databases'
+
+And import the export:
+
+    >>> zk.import_tree(export, '/databases')
+    >>> print zk.export_tree('/databases'),
+    /databases
+      /cms
+        /providers
+
+Next, we'll create a symbolic link at the old location. We can use the
+``ln`` convenience method::
+
+    >>> zk.ln('/databases/cms', '/cms/databases/main')
+    >>> zk.get_properties('/cms/databases')
+    {u'main ->': u'/databases/cms'}
+
+Now, we can remove ``/cms/databases/main`` and ``main_children`` will
+be updated::
+
+    >>> zk.delete_recursive('/cms/databases/main')
+    >>> main_children.path
+    '/cms/databases/main'
+    >>> main_children.real_path
+    u'/databases/cms'
+
+.. test
+
+    >>> main_properties.path
+    '/cms/databases/main'
+    >>> main_properties.real_path
+    u'/databases/cms'
+
+If we update ``/databases/cms``, ``main_children`` will see the
+updates:
+
+    >>> sorted(main_children)
+    ['providers']
+    >>> zk.delete('/databases/cms/providers')
+    >>> sorted(main_children)
+    []
+
+.. test
+
+    >>> dict(main_properties)
+    {}
+    >>> zk.properties('/databases/cms').set(a=1)
+    >>> dict(main_properties)
+    {u'a': 1}
+
+Node deletion
+-------------
+
+If a node is deleted and ``Children`` or ``Properties`` instances have
+been created for it, and the paths they were created with can't be
+resolved using symbolic links, then the instances' data will be
+cleared.  Attempts to update properties will fail.  If callbacks have
+been registered, they will be called without arguments, if possible.
+It would be bad, in practice, to remove a node that processes are
+watching.
+
 ``zc.zk.ZooKeeper``
 -------------------
 
@@ -457,9 +563,19 @@
 ``children(path)``
    Return a `zc.zk.Children`_ for the path.
 
+   Note that there is a fair bit of machinery in `zc.zk.Children`_
+   objects to support keeping them up to date, callbacks, and cleaning
+   them up when they are no-longer used.  If you only want to get the
+   list of children once, use ``get_children``.
+
 ``properties(path)``
    Return a `zc.zk.Properties`_ for the path.
 
+   Note that there is a fair bit of machinery in `zc.zk.Properties`_
+   objects to support keeping them up to date, callbacks, and cleaning
+   them up when they are no-longer used.  If you only want to get the
+   properties once, use ``get_properties``.
+
 ``handle``
     The ZooKeeper session handle
 
@@ -481,31 +597,61 @@
 
     acl
        An access control-list to use for imported nodes.  If not
-       specifuied, then full access is allowed to everyone.
+       specified, then full access is allowed to everyone.
 
     dry_run
        Boolean, defaulting to false, indicating whether to do a dry
        run of the import, without applying any changes.
 
-``export_tree(path[, include_ephemeral])``
+``export_tree(path[, ephemeral[, name]])``
     Export a tree to a text representation.
 
     path
       The path to export.
 
-    include_ephemeral
+    ephemeral
        Boolean, defaulting to false, indicating whether to include
        ephemeral nodes in the export.  Including ephemeral nodes is
        mainly useful for visualizing the tree state.
 
+    name
+       The name to use for the top-level node.
+
+       This is useful when using export and import to copy a tree to
+       a different location and name in the hierarchy.
+
+       Normally, when exporting the root node, ``/``, the root isn't
+       included, but it is included if a name is given.
+
 ``delete_recursive(path[, dry_run])``
-   Delete a node and all of it's subnodes.
+   Delete a node and all of it's sub-nodes.
 
    Ephemeral nodes or nodes containing them are not deleted.
 
    The dry_run option causes a summary of what would be deleted to be
    printed without actually deleting anything.
 
+``get_children(path)``
+   Get a list of the names of the children the node at the given path.
+
+   This is more efficient than ``children`` when all you need is to
+   read the list once, as it doesn't create a `zc.zk.Children`_
+   object.
+
+``get_properties(path)``
+   Get the properties for the node at the given path as a dictionary.
+
+   This is more efficient than ``properties`` when all you need is to
+   read the properties once, as it doesn't create a
+   `zc.zk.Properties`_ object.
+
+``ln(source, destination)``
+   Create a symbolic link at the destination path pointing to the
+   source path.
+
+   If the destination path ends with ``'/'``, then the source name is
+   appended to the destination.
+
 ``resolve(path)``
    Find the real path for the given path.
 
@@ -529,12 +675,11 @@
 ZooKeeper functions as methods: ``acreate``, ``add_auth``,
 ``adelete``, ``aexists``, ``aget``, ``aget_acl``, ``aget_children``,
 ``aset``, ``aset_acl``, ``async``, ``client_id``, ``create``,
-``delete``, ``exists``, ``get``, ``get_acl``, ``get_children``,
-``is_unrecoverable``, ``recv_timeout``, ``set``, ``set2``,
-``set_acl``, ``set_debug_level``, ``set_log_stream``, ``set_watcher``,
-and ``zerror``. When calling these as methods on ``ZooKeeper``
-instances, it isn't necessary to pass a handle, as that is provided
-automatically.
+``delete``, ``exists``, ``get``, ``get_acl``, ``is_unrecoverable``,
+``recv_timeout``, ``set``, ``set2``, ``set_acl``, ``set_debug_level``,
+``set_log_stream``, ``set_watcher``, and ``zerror``. When calling
+these as methods on ``ZooKeeper`` instances, it isn't necessary to
+pass a handle, as that is provided automatically.
 
 zc.zk.Children
 --------------
@@ -549,6 +694,8 @@
     The callback is passed the children instance when a child node is
     added or removed.
 
+    The ``Children`` instance is returned.
+
 zc.zk.Properties
 ----------------
 
@@ -566,7 +713,7 @@
    Update the properties for the node.
 
    The data argument, if given, must be a dictionary or something that
-   can be passed to a dictiomnary's ``update`` method.  Items supplied
+   can be passed to a dictionary's ``update`` method.  Items supplied
    as keywords take precedence over items supplied in the data
    argument.
 
@@ -576,30 +723,27 @@
     The callback is passed the properties instance when properties are
     changed.
 
-Node deletion
--------------
+    The ``Properties`` instance is returned.
 
-If a node is deleted and ``Children`` or ``Properties`` instances have
-been created for it, the instances' data will be cleared.  Attempts to
-update properties will fail.  If callbacks have been registered, they
-will be called without arguments, if possible.  It would be bad, in
-practice, to remove a node that processes are watching.
-
 Changes
 -------
 
-0.2.0 (2011-12-)
+0.2.0 (2011-12-05)
 ~~~~~~~~~~~~~~~~~~
 
 - Added tree import and export.
-- Added recursive node-deletion API.
 - Added symbolic-links.
 - properties set and update methods now accept positional
   mapping objects (or iterables of items) as well as keyword arguments.
+- Added recursive node-deletion API.
+- Added ``get_properties`` to get properties without creating watches.
 - Added convenience access to low-level ZooKeeper APIs.
 - Added ``OPEN_ACL_UNSAFE`` and ``READ_ACL_UNSAFE`` (in ``zc.zk``),
-  which are mentioned by the ZooKeeper docs. but not included in the
+  which are mentioned by the ZooKeeper documentation. but not included in the
   ``zookeeper`` module.
+- ``Children`` and ``Properties`` objects are now cleaned up when
+  no-longer used.  Previously, they remained in memory for the life of
+  the session.
 
 0.1.0 (2011-11-27)
 ~~~~~~~~~~~~~~~~~~

Modified: zc.zk/trunk/src/zc/zk/__init__.py
===================================================================
--- zc.zk/trunk/src/zc/zk/__init__.py	2011-12-05 19:59:35 UTC (rev 123574)
+++ zc.zk/trunk/src/zc/zk/__init__.py	2011-12-05 20:00:20 UTC (rev 123575)
@@ -18,6 +18,7 @@
 import re
 import sys
 import threading
+import weakref
 import zc.thread
 import zookeeper
 
@@ -88,7 +89,10 @@
 READ_ACL_UNSAFE = [world_permission()]
 
 
-_text_is_node = re.compile(r'/(?P<name>\S+)$').match
+_text_is_node = re.compile(
+    r'/(?P<name>\S+)'
+    '(\s*:\s*(?P<type>\S.*))?'
+    '$').match
 _text_is_property = re.compile(
     r'(?P<name>\S+)'
     '\s*=\s*'
@@ -108,16 +112,23 @@
 class LinkLoop(Exception):
     pass
 
+class FailedConnect(Exception):
+    pass
+
 class ZooKeeper:
 
-    def __init__(self, zkaddr=2181):
+    def __init__(self, zkaddr=2181, timeout=1):
         if isinstance(zkaddr, int):
             zkaddr = "127.0.0.1:%s" % zkaddr
+        self.timeout = timeout
         self.zkaddr = zkaddr
-        self.watches = set()
+        self.watches = WatchManager()
         self.connected = threading.Event()
-        zookeeper.init(zkaddr, self._watch_session)
-        self.connected.wait()
+        handle = zookeeper.init(zkaddr, self._watch_session)
+        self.connected.wait(timeout)
+        if not self.connected.is_set():
+            zookeeper.close(handle)
+            raise FailedConnect(zkaddr)
 
     handle = None
     def _watch_session(self, handle, event_type, state, path):
@@ -126,12 +137,8 @@
         if state == zookeeper.CONNECTED_STATE:
             if self.handle is None:
                 self.handle = handle
-                if self.watches:
-                    # reestablish after session reestablished
-                    watches = self.watches
-                    self.watches = set()
-                    for watch in watches:
-                        self._watch(watch, False)
+                for watch in self.watches.clear():
+                    self._watch(watch, False)
             else:
                 assert handle == self.handle
             self.connected.set()
@@ -150,35 +157,96 @@
         kw['pid'] = os.getpid()
         if not isinstance(addr, str):
             addr = '%s:%s' % addr
-        self.connected.wait()
+        self.connected.wait(self.timeout)
         path = self.resolve(path)
         zookeeper.create(self.handle, path + '/' + addr, encode(kw),
                          [world_permission()], zookeeper.EPHEMERAL)
 
     def _watch(self, watch, wait=True):
+        event_type = watch.event_type
         if wait:
-            self.connected.wait()
-        self.watches.add(watch)
+            self.connected.wait(self.timeout)
+        watch.real_path = real_path = self.resolve(watch.path)
+        key = event_type, real_path
+        if self.watches.add(key, watch):
+            try:
+                self._watchkey(key)
+            except zookeeper.ConnectionLossException:
+                # We lost a race here. We got disconnected between
+                # when we resolved the watch path and the time we set
+                # the watch. This is very unlikely.
+                watches = set(self.watches.pop(key))
+                for w in watches:
+                    w._deleted()
+                if watch in watches:
+                    watches.remove(watch)
+                if watches:
+                    # OMG, how unlucky can we be?
+                    # someone added a watch between the time we added
+                    # the key and failed to add the watch in zookeeper.
+                    logger.critical('lost watches %r', watches)
+                raise
+        else:
+            # We already had a watch for the key.  We need to pass this one
+            # it's data.
+            zkfunc = getattr(zookeeper, self.__zkfuncs[event_type])
+            watch._notify(zkfunc(self.handle, real_path))
 
-        def handler(h, t, state, p):
-            if watch not in self.watches:
-                return
-            assert h == self.handle
-            assert state == zookeeper.CONNECTED_STATE
-            assert p == watch.path
-            if t == zookeeper.DELETED_EVENT:
+    __zkfuncs = {
+        zookeeper.CHANGED_EVENT: 'get',
+        zookeeper.CHILD_EVENT: 'get_children',
+        }
+    def _watchkey(self, key):
+        event_type, real_path = key
+        zkfunc = getattr(zookeeper, self.__zkfuncs[event_type])
+
+        def handler(h, t, state, p, reraise=False):
+            try:
+                assert h == self.handle
+                assert state == zookeeper.CONNECTED_STATE
+                assert p == real_path
+                if key not in self.watches:
+                    return
+
+                if t == zookeeper.DELETED_EVENT:
+                    self._rewatch(key)
+                else:
+                    assert t == event_type
+                    try:
+                        v = zkfunc(self.handle, real_path, handler)
+                    except zookeeper.NoNodeException:
+                        self._rewatch(key)
+                    else:
+                        for watch in self.watches.watches(key):
+                            watch._notify(v)
+            except:
+                logger.exception("%s(%s) handler failed",
+                                 self.__zkfuncs[event_type], real_path)
+                if reraise:
+                    raise
+
+        handler(self.handle, event_type, self.state, real_path, True)
+
+    def _rewatch(self, key):
+        event_type = key[0]
+        for watch in self.watches.pop(key):
+            try:
+                real_path = self.resolve(watch.path)
+            except (zookeeper.NoNodeException, LinkLoop):
+                logger.exception("%s path went away", watch)
                 watch._deleted()
-                self.watches.remove(watch)
             else:
-                assert t == watch.event_type
-                zkfunc = getattr(zookeeper, watch.zkfunc)
-                watch._notify(zkfunc(self.handle, watch.path, handler))
+                self._watch(watch, False)
 
-        handler(self.handle, watch.event_type, self.state, watch.path)
-
     def children(self, path):
         return Children(self, path)
 
+    def get_properties(self, path):
+        return decode(self.get(path)[0])
+
+    def properties(self, path):
+        return Properties(self, path)
+
     def import_tree(self, text, path='/', trim=False, acl=OPEN_ACL_UNSAFE,
                     dry_run=False):
         # Step 1, build up internal tree repesentation:
@@ -211,6 +279,8 @@
                     m = _text_is_node(data)
                     if m:
                         data = _Tree(m.group('name'))
+                        if m.group('type'):
+                            data.properties['type'] = m.group('type')
                     else:
                         if '->' in data:
                             raise ValueError(lineno, data, "Bad link format")
@@ -251,7 +321,7 @@
         self._import_tree(path, root, acl, trim, dry_run, True)
 
     def _import_tree(self, path, node, acl, trim, dry_run, top=False):
-        self.connected.wait()
+        self.connected.wait(self.timeout)
         if not top:
             new_children = set(node.children)
             for name in self.get_children(path):
@@ -269,6 +339,7 @@
             if self.exists(cpath):
                 if dry_run:
                     new = child.properties
+                    old = self.get_properties(cpath)
                     old = decode(self.get(cpath)[0])
                     for n, v in sorted(old.items()):
                         if n not in new:
@@ -325,24 +396,34 @@
                 logger.info('deleting %s', path)
                 self.delete(path)
 
-    def export_tree(self, path='/', ephemeral=False):
+    def export_tree(self, path='/', ephemeral=False, name=None):
         output = []
         out = output.append
+        self.connected.wait(self.timeout)
 
-        def export_tree(path, indent):
+        def export_tree(path, indent, name=None):
             children = self.get_children(path)
             if path == '/':
                 path = ''
                 if 'zookeeper' in children:
                     children.remove('zookeeper')
+                if name is not None:
+                    out(indent + '/' + name)
+                    indent += '  '
             else:
                 data, meta = self.get(path)
                 if meta['ephemeralOwner'] and not ephemeral:
                     return
-                out(indent+'/'+path.rsplit('/', 1)[1])
+                if name is None:
+                    name = path.rsplit('/', 1)[1]
+                properties = decode(data)
+                type_ = properties.pop('type', None)
+                if type_:
+                    name += ' : '+type_
+                out(indent + '/' + name)
                 indent += '  '
                 links = []
-                for i in sorted(decode(data).iteritems()):
+                for i in sorted(properties.iteritems()):
                     if i[0].endswith(' ->'):
                         links.append(i)
                     else:
@@ -353,12 +434,9 @@
             for name in children:
                 export_tree(path+'/'+name, indent)
 
-        export_tree(path, '')
+        export_tree(path, '', name)
         return '\n'.join(output)+'\n'
 
-    def properties(self, path):
-        return Properties(self, path)
-
     def resolve(self, path, seen=()):
         if self.exists(path):
             return path
@@ -372,7 +450,7 @@
             newpath = base + '/' + name
             if self.exists(newpath):
                 return newpath
-            props = decode(self.get(base)[0])
+            props = self.get_properties(base)
             newpath = props.get(name+' ->')
             if not newpath:
                 raise zookeeper.NoNodeException()
@@ -383,9 +461,18 @@
             raise zookeeper.NoNodeException(path)
 
     def _set(self, path, data):
-        self.connected.wait()
+        self.connected.wait(self.timeout)
         return zookeeper.set(self.handle, path, data)
 
+
+    def ln(self, target, source):
+        base, name = source.rsplit('/', 1)
+        if target[-1] == '/':
+            target += name
+        properties = self.get_properties(base)
+        properties[name+' ->'] = target
+        self._set(base, encode(properties))
+
     def close(self):
         zookeeper.close(self.handle)
         del self.handle
@@ -413,13 +500,87 @@
 
 del _make_method
 
+class WatchManager:
+    # Manage {key -> w{watches}} in a thread-safe manner.
+    # (And also provide a hard set to allow nodeinfos w callbacks
+    #  to keep themselves around.)
 
+    def __init__(self):
+        self.data = {}
+        self.lock = threading.Lock()
+
+        def _remove(ref, selfref=weakref.ref(self)):
+            self = selfref()
+            if self is None:
+                return
+            key = ref.key
+            with self.lock:
+                refs = self.data.get(key, ())
+                if ref in refs:
+                    refs.remove(ref)
+                    if not refs:
+                        del self.data[key]
+
+        self._remove = _remove
+
+    def __len__(self):
+        with self.lock:
+            return sum(
+                len([r for r in refs if r() is not None])
+                for refs in self.data.itervalues()
+                )
+
+    def __contains__(self, key):
+        with self.lock:
+            return key in self.data
+
+    def add(self, key, value):
+        ref = weakref.KeyedRef(value, self._remove, key)
+        newkey = False
+        with self.lock:
+            try:
+                refs = self.data[key]
+            except KeyError:
+                self.data[key] = refs = set()
+                newkey = True
+            refs.add(ref)
+        return newkey
+
+    def pop(self, key):
+        with self.lock:
+            watches = [ref() for ref in self.data.pop(key, ())]
+
+        for watch in watches:
+            if watch is not None:
+                yield watch
+
+    def watches(self, key):
+        with self.lock:
+            watches = [ref() for ref in self.data.get(key, ())]
+
+        for watch in watches:
+            if watch is not None:
+                yield watch
+
+    def clear(self):
+        # Clear data and return an iterator on the old values
+        with self.lock:
+            old = self.data
+            self.data = {}
+
+        for refs in old.itervalues():
+            for ref in refs:
+                v = ref()
+                if v is not None:
+                    yield v
+
+
 class NodeInfo:
 
     def __init__(self, session, path):
         self.session = session
-        self.path = session.resolve(path)
-        self.callbacks = set()
+        self.path = path
+        self.callbacks = []
         session._watch(self)
 
     def setData(self, data):
@@ -438,7 +599,8 @@
                 logger.exception('Error %r calling %r', self, callback)
 
     def __repr__(self):
-        return "%s.%s(%s, %s)" % (
+        return "%s%s.%s(%s, %s)" % (
+            self.deleted and 'DELETED: ' or '',
             self.__class__.__module__, self.__class__.__name__,
             self.session.handle, self.path)
 
@@ -456,8 +618,8 @@
 
     def __call__(self, func):
         func(self)
-        self.callbacks.add(func)
-        return func
+        self.callbacks.append(func)
+        return self
 
     def __iter__(self):
         return iter(self.data)
@@ -465,12 +627,10 @@
 class Children(NodeInfo):
 
     event_type = zookeeper.CHILD_EVENT
-    zkfunc = 'get_children'
 
 class Properties(NodeInfo, collections.Mapping):
 
     event_type = zookeeper.CHANGED_EVENT
-    zkfunc = 'get'
 
     def setData(self, data):
         sdata, self.meta_data = data

Modified: zc.zk/trunk/src/zc/zk/tests.py
===================================================================
--- zc.zk/trunk/src/zc/zk/tests.py	2011-12-05 19:59:35 UTC (rev 123574)
+++ zc.zk/trunk/src/zc/zk/tests.py	2011-12-05 20:00:20 UTC (rev 123575)
@@ -22,6 +22,7 @@
 import pprint
 import re
 import StringIO
+import sys
 import time
 import zc.zk
 import zc.thread
@@ -60,6 +61,21 @@
 def side_effect(mock):
     return lambda func: setattr(mock, 'side_effect', func)
 
+class zklogger(object):
+
+    def __init__(self):
+        logger = logging.getLogger('zc.zk')
+        h = logging.StreamHandler(sys.stdout)
+        h.setFormatter(logging.Formatter("%(levelname)s %(message)s"))
+        logger.addHandler(h)
+        self.h = h
+        logger.setLevel(logging.DEBUG)
+
+    def uninstall(self):
+        logger = logging.getLogger('zc.zk')
+        logger.removeHandler(self.h)
+        logger.setLevel(logging.NOTSET)
+
 class Tests(unittest.TestCase):
 
     @mock.patch('zookeeper.init')
@@ -118,8 +134,9 @@
 
         path = '/test'
         @side_effect(get_children)
-        def _(handle, path_, handler):
-            self.__handler = handler
+        def _(handle, path_, handler=None):
+            if handler is not None:
+                self.__handler = handler
             self.assertEqual((handle, path_), (0, path))
             return data
 
@@ -135,7 +152,7 @@
         self.assertEqual(list(children), data)
 
         # callbacks are called too:
-        cb = children(mock.Mock())
+        cb = mock.Mock(); children(cb)
         cb.assert_called_with(children)
         cb.reset_mock()
         self.assertEqual(len(children.callbacks), 1)
@@ -161,7 +178,7 @@
 
         # if a callback raises zc.zk.CancelWatch, the cancel is logged
         # and callback is discarded
-        cb = children(mock.Mock())
+        cb = mock.Mock(); children(cb)
         self.assertEqual(len(children.callbacks), 1)
         cb.side_effect = zc.zk.CancelWatch
         data = []
@@ -176,7 +193,7 @@
         h.uninstall()
 
         # If a session expires, it will be reestablished with watches intact.
-        cb = children(mock.Mock())
+        cb = mock.Mock(); children(cb)
         self.__session_watcher(
             0, zookeeper.SESSION_EVENT, zookeeper.EXPIRED_SESSION_STATE, "")
         close.assert_called_with(0)
@@ -197,8 +214,9 @@
 
         path = '/test'
         @side_effect(get)
-        def _(handle, path_, handler):
-            self.__handler = handler
+        def _(handle, path_, handler=None):
+            if handler is not None:
+                self.__handler = handler
             self.assertEqual((handle, path_), (0, path))
             return json.dumps(data), {}
 
@@ -214,11 +232,10 @@
         self.assertEqual(dict(properties), data)
 
         # callbacks are called too:
-        cb = properties(mock.Mock())
+        cb = mock.Mock(); properties(cb)
         cb.assert_called_with(properties)
         cb.reset_mock()
         self.assertEqual(len(properties.callbacks), 1)
-        data = dict(a=1, b=2)
         self.__handler(0, zookeeper.CHANGED_EVENT, zookeeper.CONNECTED_STATE,
                      path)
         self.assertEqual(dict(properties), data)
@@ -240,7 +257,7 @@
 
         # if a callback raises zc.zk.CancelWatch, the cancel is logged
         # and callback is discarded
-        cb = properties(mock.Mock())
+        cb = mock.Mock(); properties(cb)
         self.assertEqual(len(properties.callbacks), 1)
         cb.side_effect = zc.zk.CancelWatch
         data = {}
@@ -255,7 +272,7 @@
         h.uninstall()
 
         # If a session expires, it will be reestablished with watches intact.
-        cb = properties(mock.Mock())
+        cb = mock.Mock(); properties(cb)
         self.__session_watcher(
             0, zookeeper.SESSION_EVENT, zookeeper.EXPIRED_SESSION_STATE, "")
         close.assert_called_with(0)
@@ -274,8 +291,9 @@
 
         path = '/test'
         @side_effect(get)
-        def _(handle, path_, handler):
-            self.__handler = handler
+        def _(handle, path_, handler=None):
+            if handler is not None:
+                self.__handler = handler
             self.assertEqual((handle, path_), (0, path))
             return json.dumps(data), {}
 
@@ -304,8 +322,9 @@
 
         path = '/test'
         @side_effect(get)
-        def _(handle, path_, handler):
-            self.__handler = handler
+        def _(handle, path_, handler=None):
+            if handler is not None:
+                self.__handler = handler
             self.assertEqual((handle, path_), (0, path))
             return data, {}
 
@@ -337,45 +356,236 @@
         properties.set(string_value='xxx')
         self.assertEqual(self.__set_data, 'xxx')
 
-    @mock.patch('zookeeper.state')
-    @mock.patch('zookeeper.get')
-    @mock.patch('zookeeper.get_children')
-    def test_deleted_node_with_watchers(self, get_children, get, state):
-        state.side_effect = self.state_side_effect
-        path = '/test'
-        @side_effect(get)
-        def _(handle, path_, handler):
-            self.__get_handler = handler
-            return '{"a": 1}', {}
-        @side_effect(get_children)
-        def _(handle, path_, handler):
-            self.__child_handler = handler
-            return ['x']
+def test_children():
+    """
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+    >>> _ = zk.create('/test', '', zc.zk.OPEN_ACL_UNSAFE)
+    >>> children = zk.children('/test')
+    >>> sorted(children)
+    []
 
-        children = self.__zk.children(path)
-        self.assertEqual(list(children), ['x'])
-        cb = children(mock.Mock())
-        cb.side_effect = lambda x: None
-        ccb = children(mock.Mock())
-        ccb.assert_called_with(children)
+    >>> def create(path):
+    ...     zk.create(path, '', zc.zk.OPEN_ACL_UNSAFE)
+    >>> create('/test/a')
+    >>> sorted(children)
+    ['a']
 
-        properties = self.__zk.properties(path)
-        self.assertEqual(dict(properties), dict(a=1))
-        cb = properties(mock.Mock())
-        cb.side_effect = lambda x: None
-        pcb = properties(mock.Mock())
-        pcb.assert_called_with(properties)
+We can register callbacks:
 
-        self.__get_handler(
-            0, zookeeper.DELETED_EVENT, zookeeper.CONNECTED_STATE, path)
-        self.assertEqual(dict(properties), {})
-        pcb.assert_called_with()
+    >>> @children
+    ... def cb(c):
+    ...     print 'good', sorted(c)
+    good ['a']
 
-        self.__child_handler(
-            0, zookeeper.DELETED_EVENT, zookeeper.CONNECTED_STATE, path)
-        self.assertEqual(list(children), [])
-        ccb.assert_called_with()
+When we register a callback, it gets called immediately with a children object.
 
+    >>> create('/test/b')
+    good ['a', 'b']
+    >>> sorted(children)
+    ['a', 'b']
+
+If a callback raises an error immediately, it isn't saved:
+
+    >>> @children
+    ... def bad(c):
+    ...     raise ValueError
+    Traceback (most recent call last):
+    ...
+    ValueError
+
+    >>> create('/test/c')
+    good ['a', 'b', 'c']
+
+If a callback raises an error later, it is logged and the callback is
+cancelled:
+
+    >>> logger = zklogger()
+
+    >>> badnow = False
+    >>> @children
+    ... def bad(c):
+    ...     assert c is children
+    ...     print 'bad later', sorted(c)
+    ...     if badnow:
+    ...         raise ValueError
+    bad later ['a', 'b', 'c']
+
+    >>> zk.delete('/test/c')
+    good ['a', 'b']
+    bad later ['a', 'b']
+
+    >>> badnow = True
+    >>> zk.delete('/test/b') # doctest: +ELLIPSIS
+    good ['a']
+    bad later ['a']
+    ERROR watch(zc.zk.Children(0, /test), <function bad at ...>)
+    Traceback (most recent call last):
+    ...
+    ValueError
+
+    >>> zk.delete('/test/a')
+    good []
+
+A callback can also cancel itself by raising CancelWatch:
+
+    >>> cancelnow = False
+    >>> @children
+    ... def cancel(c):
+    ...     assert c is children
+    ...     print 'cancel later', sorted(c)
+    ...     if cancelnow:
+    ...         raise zc.zk.CancelWatch
+    cancel later []
+
+    >>> create('/test/a')
+    good ['a']
+    cancel later ['a']
+
+    >>> cancelnow = True
+    >>> create('/test/b') # doctest: +ELLIPSIS
+    good ['a', 'b']
+    cancel later ['a', 'b']
+    DEBUG cancelled watch(zc.zk.Children(0, /test), <function cancel at ...>)
+
+    >>> logger.uninstall()
+    """
+
+def test_handler_cleanup():
+    """
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+    >>> _ = zk.create('/test', '', zc.zk.OPEN_ACL_UNSAFE)
+
+Children:
+
+    >>> children = zk.children('/test')
+    >>> len(zk.watches)
+    1
+    >>> del children
+    >>> len(zk.watches)
+    0
+
+    >>> children = zk.children('/test')
+    >>> @children
+    ... def kids(c):
+    ...     print c
+    zc.zk.Children(0, /test)
+    >>> len(zk.watches)
+    1
+    >>> del children
+    >>> len(zk.watches)
+    1
+    >>> del kids
+    >>> len(zk.watches)
+    0
+
+    >>> @zk.children('/test')
+    ... def kids(c):
+    ...     print c
+    zc.zk.Children(0, /test)
+
+    >>> len(zk.watches)
+    1
+    >>> del kids
+    >>> len(zk.watches)
+    0
+
+Properties:
+
+    >>> properties = zk.properties('/test')
+    >>> len(zk.watches)
+    1
+    >>> del properties
+    >>> len(zk.watches)
+    0
+
+    >>> properties = zk.properties('/test')
+    >>> @properties
+    ... def props(c):
+    ...     print c
+    zc.zk.Properties(0, /test)
+    >>> len(zk.watches)
+    1
+    >>> del properties
+    >>> len(zk.watches)
+    1
+    >>> del props
+    >>> len(zk.watches)
+    0
+
+    >>> @zk.properties('/test')
+    ... def props(c):
+    ...     print c
+    zc.zk.Properties(0, /test)
+
+    >>> len(zk.watches)
+    1
+    >>> del props
+    >>> len(zk.watches)
+    0
+
+    """
+
+def test_deleted_node_with_watchers():
+    """
+
+Set up some handlers.
+
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+    >>> _ = zk.create('/test', '{"a": 1}', zc.zk.OPEN_ACL_UNSAFE)
+
+    >>> children = zk.children('/test')
+    >>> @children
+    ... def _(arg):
+    ...     print 1, list(arg)
+    1 []
+
+    >>> @children
+    ... def _(arg=None):
+    ...     print 2, arg
+    2 zc.zk.Children(0, /test)
+
+    >>> _ = zk.create('/test/a', '', zc.zk.OPEN_ACL_UNSAFE)
+    1 ['a']
+    2 zc.zk.Children(0, /test)
+
+    >>> zk.delete('/test/a')
+    1 []
+    2 zc.zk.Children(0, /test)
+
+    >>> properties = zk.properties('/test')
+    >>> @properties
+    ... def _(arg):
+    ...     print 3, dict(arg)
+    3 {u'a': 1}
+
+    >>> @properties
+    ... def _(arg=None):
+    ...     print 4, arg
+    4 zc.zk.Properties(0, /test)
+
+    >>> zk.set('/test', '{"b": 2}')
+    3 {u'b': 2}
+    4 zc.zk.Properties(0, /test)
+
+Hack data into the child watcher to verify it's cleared:
+
+    >>> children.data = 'data'
+
+Now delete the node.  The handlers that accept no arguments will be called:
+
+    >>> zk.delete('/test')
+    4 None
+    2 None
+
+Note that the handlers that accept 0 arguments were called.
+
+And the data are cleared:
+
+    >>> list(children), list(properties)
+    ([], [])
+    """
+
+
 def resilient_import():
     """
 We can use vatious spacing in properties and links:
@@ -537,12 +747,7 @@
 
     >>> zk.resolve('/top/a/top/a/b/top/x')
     Traceback (most recent call last):
-      File "/usr/local/python/2.6/lib/python2.6/doctest.py", line 1253, in __run
-        compileflags, 1) in test.globs
-      File "<doctest zc.zk.tests.test_resolve[4]>", line 1, in <module>
-        zk.resolve('/top/a/top/a/b/top/x')
-      File "/Users/jim/p/zc/zk/trunk/src/zc/zk/__init__.py", line 382, in resolve
-        raise zookeeper.NoNodeException(path)
+    ...
     NoNodeException: /top/a/top/a/b/top/x
 
     >>> zk.resolve('/top/a/b/c/d/loop')
@@ -556,6 +761,29 @@
     LinkLoop: ('/top/a/loop', u'/top/a/b/loop', u'/top/a/loop')
     """
 
+def test_ln_target_w_trailing_slash():
+    """
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+    >>> zk.ln('/databases/main', '/fooservice/')
+    >>> pprint.pprint(zk.get_properties('/fooservice'))
+    {u' ->': u'/databases/main',
+     u'database': u'/databases/foomain',
+     u'favorite_color': u'red',
+     u'threads': 1}
+    """
+
+def test_export_top_w_name():
+    """
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+    >>> print zk.export_tree('/', name='top'),
+    /top
+      /fooservice
+        database = u'/databases/foomain'
+        favorite_color = u'red'
+        threads = 1
+        /providers
+    """
+
 def assert_(cond, mess=''):
     if not cond:
         print 'assertion failed: ', mess
@@ -628,11 +856,12 @@
 
     def delete(self, handle, path):
         self.check_handle(handle)
-        self.traverse(path) # seeif it's there
+        node = self.traverse(path)
         base, name = path.rsplit('/', 1)
-        node = self.traverse(base)
-        del node.children[name]
-        node.children_changed(self.handle, zookeeper.CONNECTED_STATE, base)
+        bnode = self.traverse(base)
+        del bnode.children[name]
+        node.deleted(self.handle, zookeeper.CONNECTED_STATE, path)
+        bnode.children_changed(self.handle, zookeeper.CONNECTED_STATE, base)
 
     def exists(self, handle, path):
         self.check_handle(handle)
@@ -699,6 +928,16 @@
         for w in watchers:
             w(handle, zookeeper.CHANGED_EVENT, state, path)
 
+    def deleted(self, handle, state, path):
+        watchers = self.watchers
+        self.watchers = ()
+        for w in watchers:
+            w(handle, zookeeper.DELETED_EVENT, state, path)
+        watchers = self.child_watchers
+        self.watchers = ()
+        for w in watchers:
+            w(handle, zookeeper.DELETED_EVENT, state, path)
+
 def test_suite():
     return unittest.TestSuite((
         unittest.makeSuite(Tests),



More information about the checkins mailing list