[Checkins] SVN: zc.zk/trunk/src/zc/zk/ Fixed bug: Ephemeral nodes weren't recreated when sessions were

Jim Fulton jim at zope.com
Thu Dec 8 18:39:42 UTC 2011


Log message for revision 123635:
  Fixed bug: Ephemeral nodes weren't recreated when sessions were
  reestablished.
  
  This required:
  
  - Implementing node-modification methods so we could track changes
    to ephemeral nodes.
  
  - Updating the testing framework to implement a lot more methods, to
    have more realistic node meta data, to check versions, and to
    provide better session emulation.
  

Changed:
  U   zc.zk/trunk/src/zc/zk/README.txt
  U   zc.zk/trunk/src/zc/zk/__init__.py
  U   zc.zk/trunk/src/zc/zk/testing.py
  U   zc.zk/trunk/src/zc/zk/tests.py

-=-
Modified: zc.zk/trunk/src/zc/zk/README.txt
===================================================================
--- zc.zk/trunk/src/zc/zk/README.txt	2011-12-08 17:07:14 UTC (rev 123634)
+++ zc.zk/trunk/src/zc/zk/README.txt	2011-12-08 18:39:41 UTC (rev 123635)
@@ -175,21 +175,6 @@
 from keyword parameters.  Keyword parameters take precedent over the
 positional data argument.
 
-ZooKeeper Session Management
-----------------------------
-
-``zc.zk`` takes care of ZooKeeper session management for you. It
-establishes and, if necessary, reestablishes sessions for you.  In
-particular, it takes care of reestablishing ZooKeeper watches when a
-session is reestablished.
-
-ZooKeeper logging
------------------
-
-``zc.zk`` bridges the low-level ZooKeeper logging API and the Python
-logging API.  ZooKeeper log messages are forwarded to the Python
-``'ZooKeeper'`` logger.
-
 Tree-definition format, import, and export
 ------------------------------------------
 
@@ -329,7 +314,7 @@
     extra path not trimmed: /lb/pools/retail
 
 We got a warning about nodes left over from the old tree.  We can see
-this if we export the tree::
+this if we look at the tree::
 
     >>> print zk.export_tree(),
     /cms : z4m cms
@@ -558,6 +543,27 @@
 It would be bad, in practice, to remove a node that processes are
 watching.
 
+ZooKeeper Session Management
+----------------------------
+
+``zc.zk`` takes care of ZooKeeper session management for you. It
+establishes and, if necessary, reestablishes sessions for you.  In
+particular, it takes care of reestablishing ZooKeeper watches and
+ephemeral nodes when a session is reestablished.
+
+Note
+  To reestablish ephemeral nodes, it's necessary for ``zc.zk`` to
+  track node-moderation operations, so you have to access the
+  ZooKeeper APIs through the `zc.zk.ZooKeeper`_ object, rather than
+  using the low-level extension directly.
+
+ZooKeeper logging
+-----------------
+
+``zc.zk`` bridges the low-level ZooKeeper logging API and the Python
+logging API.  ZooKeeper log messages are forwarded to the Python
+``'ZooKeeper'`` logger.
+
 zc.zk.ZooKeeper
 ---------------
 
@@ -572,41 +578,20 @@
    them up when they are no-longer used.  If you only want to get the
    list of children once, use ``get_children``.
 
-``properties(path)``
-   Return a `zc.zk.Properties`_ for the path.
+``close()``
+    Close the ZooKeeper session.
 
-   Note that there is a fair bit of machinery in `zc.zk.Properties`_
-   objects to support keeping them up to date, callbacks, and cleaning
-   them up when they are no-longer used.  If you only want to get the
-   properties once, use ``get_properties``.
+    This should be called when cleanly shutting down servers to more
+    quickly remove ephemeral nodes.
 
-``handle``
-    The ZooKeeper session handle
+``delete_recursive(path[, dry_run])``
+   Delete a node and all of it's sub-nodes.
 
-    This attribute can be used to call the lower-level API provided by
-    the ``zookeeper`` extension.
+   Ephemeral nodes or nodes containing them are not deleted.
 
-``import_tree(text[, path='/'[, trim[, acl[, dry_run]]]])``
-    Create tree nodes by importing a textual tree representation.
+   The dry_run option causes a summary of what would be deleted to be
+   printed without actually deleting anything.
 
-    text
-       A textual representation of the tree.
-
-    path
-       The path at which to create the top-level nodes.
-
-    trim
-       Boolean, defaulting to false, indicating whether nodes not in
-       the textual representation should be removed.
-
-    acl
-       An access control-list to use for imported nodes.  If not
-       specified, then full access is allowed to everyone.
-
-    dry_run
-       Boolean, defaulting to false, indicating whether to do a dry
-       run of the import, without applying any changes.
-
 ``export_tree(path[, ephemeral[, name]])``
     Export a tree to a text representation.
 
@@ -627,14 +612,6 @@
        Normally, when exporting the root node, ``/``, the root isn't
        included, but it is included if a name is given.
 
-``delete_recursive(path[, dry_run])``
-   Delete a node and all of it's sub-nodes.
-
-   Ephemeral nodes or nodes containing them are not deleted.
-
-   The dry_run option causes a summary of what would be deleted to be
-   printed without actually deleting anything.
-
 ``get_children(path)``
    Get a list of the names of the children the node at the given path.
 
@@ -649,6 +626,27 @@
    read the properties once, as it doesn't create a
    `zc.zk.Properties`_ object.
 
+``import_tree(text[, path='/'[, trim[, acl[, dry_run]]]])``
+    Create tree nodes by importing a textual tree representation.
+
+    text
+       A textual representation of the tree.
+
+    path
+       The path at which to create the top-level nodes.
+
+    trim
+       Boolean, defaulting to false, indicating whether nodes not in
+       the textual representation should be removed.
+
+    acl
+       An access control-list to use for imported nodes.  If not
+       specified, then full access is allowed to everyone.
+
+    dry_run
+       Boolean, defaulting to false, indicating whether to do a dry
+       run of the import, without applying any changes.
+
 ``ln(source, destination)``
    Create a symbolic link at the destination path pointing to the
    source path.
@@ -656,10 +654,22 @@
    If the destination path ends with ``'/'``, then the source name is
    appended to the destination.
 
-``resolve(path)``
-   Find the real path for the given path.
+``print_tree(path='/')``
+   Print the tree at the given path.
 
-``register_server(path, address, **data)``
+   This is just a short-hand for::
+
+     print zk.export_tree(path, ephemeral=True),
+
+``properties(path)``
+   Return a `zc.zk.Properties`_ for the path.
+
+   Note that there is a fair bit of machinery in `zc.zk.Properties`_
+   objects to support keeping them up to date, callbacks, and cleaning
+   them up when they are no-longer used.  If you only want to get the
+   properties once, use ``get_properties``.
+
+``register_server(path, address, acl=zc.zk.READ_ACL_UNSAFE, **data)``
     Register a server at a path with the address.
 
     An ephemeral child node of ``path`` will be created with name equal
@@ -667,23 +677,21 @@
 
     ``address`` must be a host and port tuple.
 
+    ``acl`` is a ZooKeeper access control list.
+
     Optional node properties can be provided as keyword arguments.
 
-``close()``
-    Close the ZooKeeper session.
+``resolve(path)``
+   Find the real path for the given path.
 
-    This should be called when cleanly shutting down servers to more
-    quickly remove ephemeral nodes.
-
 In addition, ``ZooKeeper`` instances provide access to the following
 ZooKeeper functions as methods: ``acreate``, ``add_auth``,
 ``adelete``, ``aexists``, ``aget``, ``aget_acl``, ``aget_children``,
-``aset``, ``aset_acl``, ``async``, ``client_id``, ``create``,
-``delete``, ``exists``, ``get``, ``get_acl``, ``is_unrecoverable``,
-``recv_timeout``, ``set``, ``set2``, ``set_acl``, ``set_debug_level``,
-``set_log_stream``, ``set_watcher``, and ``zerror``. When calling
-these as methods on ``ZooKeeper`` instances, it isn't necessary to
-pass a handle, as that is provided automatically.
+``aset``, ``aset_acl``, ``async``, ``create``, ``delete``, ``exists``,
+``get``, ``get_acl``, ``is_unrecoverable``, ``recv_timeout``, ``set``,
+``set2``, ``set_acl``, and ``set_watcher``.  When calling these as
+methods on ``ZooKeeper`` instances, it isn't necessary to pass a
+handle, as that is provided automatically.
 
 zc.zk.Children
 --------------
@@ -750,6 +758,9 @@
 0.3.0 (2011-12-??)
 ------------------
 
+- Fixed bug: Ephemeral nodes weren't recreated when sessions were
+  reestablished.
+
 - Added a testing module that provides ZooKeeper emulation for
   testing complex interactions with zc.zk without needing a running
   ZooKeeper server.

Modified: zc.zk/trunk/src/zc/zk/__init__.py
===================================================================
--- zc.zk/trunk/src/zc/zk/__init__.py	2011-12-08 17:07:14 UTC (rev 123634)
+++ zc.zk/trunk/src/zc/zk/__init__.py	2011-12-08 18:39:41 UTC (rev 123635)
@@ -117,14 +117,18 @@
 
 class ZooKeeper:
 
-    def __init__(self, zkaddr=2181, timeout=1):
+    def __init__(self, zkaddr=2181, zktimeout=None, timeout=1):
         if isinstance(zkaddr, int):
             zkaddr = "127.0.0.1:%s" % zkaddr
         self.timeout = timeout
         self.zkaddr = zkaddr
         self.watches = WatchManager()
+        self.ephemeral = {}
         self.connected = threading.Event()
-        handle = zookeeper.init(zkaddr, self._watch_session)
+        if zktimeout:
+            handle = zookeeper.init(zkaddr, self._watch_session, zktimeout)
+        else:
+            handle = zookeeper.init(zkaddr, self._watch_session)
         self.connected.wait(timeout)
         if not self.connected.is_set():
             zookeeper.close(handle)
@@ -139,6 +143,9 @@
                 self.handle = handle
                 for watch in self.watches.clear():
                     self._watch(watch, False)
+                for path, data in self.ephemeral.items():
+                    zookeeper.create(self.handle, path, data['data'],
+                                     data['acl'], data['flags'])
             else:
                 assert handle == self.handle
             self.connected.set()
@@ -153,15 +160,61 @@
         else:
             logger.critical('unexpected session event %s %s', handle, state)
 
-    def register_server(self, path, addr, **kw):
+    def register_server(self, path, addr, acl=READ_ACL_UNSAFE, **kw):
         kw['pid'] = os.getpid()
         if not isinstance(addr, str):
             addr = '%s:%s' % addr
         self.connected.wait(self.timeout)
         path = self.resolve(path)
-        zookeeper.create(self.handle, path + '/' + addr, encode(kw),
-                         [world_permission()], zookeeper.EPHEMERAL)
+        self.create(path + '/' + addr, encode(kw), acl, zookeeper.EPHEMERAL)
 
+
+    def _async(self, completion, meth, *args):
+        post = getattr(self, '_post_'+meth)
+        if completion is None:
+            result = getattr(zookeeper, meth)(self.handle, *args)
+            post(*args)
+            return result
+
+        def asynccb(handle, status, *cargs):
+            assert handle == self.handle
+            if status == 0:
+                post(*args)
+            completion(handle, status, *cargs)
+
+        return getattr(zookeeper, 'a'+meth)(self.handle, *(args+(asynccb,)))
+
+    def create(self, path, data, acl, flags=0, completion=None):
+        return self._async(completion, 'create', path, data, acl, flags)
+    acreate = create
+
+    def _post_create(self, path, data, acl, flags):
+        if flags & zookeeper.EPHEMERAL:
+            self.ephemeral[path] = dict(data=data, acl=acl, flags=flags)
+
+    def delete(self, path, version=-1, completion=None):
+        return self._async(completion, 'delete', path, version)
+    adelete = delete
+
+    def _post_delete(self, path, version):
+        self.ephemeral.pop(path, None)
+
+    def set(self, path, data, version=-1, completion=None):
+        return self._async(completion, 'set', path, data, version)
+    aset = set2 = set
+
+    def _post_set(self, path, data, version):
+        if path in self.ephemeral:
+            self.ephemeral[path]['data'] = data
+
+    def set_acl(self, path, version, acl, completion=None):
+        return self._async(completion, 'set_acl', path, version, acl)
+    aset_acl = set_acl
+
+    def _post_set_acl(self, path, version, acl):
+        if path in self.ephemeral:
+            self.ephemeral[path]['acl'] = acl
+
     def _watch(self, watch, wait=True):
         event_type = watch.event_type
         if wait:
@@ -437,6 +490,9 @@
         export_tree(path, '', name)
         return '\n'.join(output)+'\n'
 
+    def print_tree(self, path='/'):
+        print self.export_tree(path, True),
+
     def resolve(self, path, seen=()):
         if self.exists(path):
             return path
@@ -462,7 +518,7 @@
 
     def _set(self, path, data):
         self.connected.wait(self.timeout)
-        return zookeeper.set(self.handle, path, data)
+        return self.set(path, data)
 
 
     def ln(self, target, source):
@@ -483,18 +539,15 @@
             return zookeeper.CONNECTING_STATE
         return zookeeper.state(self.handle)
 
-
 def _make_method(name):
     return (lambda self, *a, **kw:
             getattr(zookeeper, name)(self.handle, *a, **kw))
 
 for name in (
-    'acreate', 'add_auth', 'adelete', 'aexists', 'aget', 'aget_acl',
-    'aget_children', 'aset', 'aset_acl', 'async', 'client_id',
-    'create', 'delete', 'exists', 'get', 'get_acl',
-    'get_children', 'is_unrecoverable', 'recv_timeout', 'set',
-    'set2', 'set_acl', 'set_debug_level', 'set_log_stream',
-    'set_watcher', 'zerror',
+    'add_auth', 'aexists', 'aget', 'aget_acl',
+    'aget_children', 'async', 'client_id',
+    'exists', 'get', 'get_acl',
+    'get_children', 'is_unrecoverable', 'recv_timeout',
     ):
     setattr(ZooKeeper, name, _make_method(name))
 
@@ -653,7 +706,7 @@
 
     def _set(self, data):
         self.data = data
-        zookeeper.set(self.session.handle, self.path, encode(data))
+        self.session._set(self.path, encode(data))
 
     def set(self, data=None, **properties):
         data = data and dict(data) or {}

Modified: zc.zk/trunk/src/zc/zk/testing.py
===================================================================
--- zc.zk/trunk/src/zc/zk/testing.py	2011-12-08 17:07:14 UTC (rev 123634)
+++ zc.zk/trunk/src/zc/zk/testing.py	2011-12-08 18:39:41 UTC (rev 123635)
@@ -21,18 +21,27 @@
 """
 import json
 import mock
+import sys
 import threading
 import time
+import traceback
 import zc.zk
+import zc.thread
 import zookeeper
 
 __all__ = ['assert_', 'setUp', 'tearDown']
 
-def assert_(cond, mess=''):
-    """A simple assertion function for use in doctests.
+def assert_(cond, mess='', error=True):
+    """A simple assertion function.
+
+    If ``error``, raise an AssertionError if the assertion fails,
+    otherwise, print a message.
     """
     if not cond:
-        print 'assertion failed: ', mess
+        if error:
+            raise AssertionError(mess)
+        else:
+            print 'assertion failed: ', mess
 
 def wait_until(func=None, timeout=9):
     """Wait until a function returns true.
@@ -97,6 +106,8 @@
     globs = getattr(test, 'globs', test.__dict__)
     globs['wait_until'] = wait_until
     globs['zc.zk.testing'] = teardowns
+    globs['ZooKeeper'] = zk
+    globs.setdefault('assert_', assert_)
 
 def tearDown(test):
     """The matching tearDown for setUp.
@@ -107,6 +118,67 @@
     for cm in globs['zc.zk.testing']:
         cm()
 
+class Session:
+
+    def __init__(self, zk, handle, watch=None):
+        self.zk = zk
+        self.handle = handle
+        self.nodes = set()
+        self.add = self.nodes.add
+        self.remove = self.nodes.remove
+        self.watch = watch
+        self.state = zookeeper.CONNECTING_STATE
+
+    def connect(self):
+        self.newstate(zookeeper.CONNECTED_STATE)
+
+    def disconnect(self):
+        self.newstate(zookeeper.CONNECTING_STATE)
+
+    def expire(self):
+        self.zk._clear_session(self)
+        self.newstate(zookeeper.EXPIRED_SESSION_STATE)
+
+    def newstate(self, state):
+        self.state = state
+        if self.watch is not None:
+            self.watch(self.handle, zookeeper.SESSION_EVENT, state, '')
+
+    def check(self):
+        if self.state == zookeeper.CONNECTING_STATE:
+            raise zookeeper.ConnectionLossException()
+        elif self.state == zookeeper.EXPIRED_SESSION_STATE:
+            raise zookeeper.SessionExpiredException()
+        elif self.state != zookeeper.CONNECTED_STATE:
+            raise AssertionError('Invalid state')
+
+exception_codes = {
+    zookeeper.ApiErrorException: zookeeper.APIERROR,
+    zookeeper.AuthFailedException: zookeeper.AUTHFAILED,
+    zookeeper.BadArgumentsException: zookeeper.BADARGUMENTS,
+    zookeeper.BadVersionException: zookeeper.BADVERSION,
+    zookeeper.ClosingException: zookeeper.CLOSING,
+    zookeeper.ConnectionLossException: zookeeper.CONNECTIONLOSS,
+    zookeeper.DataInconsistencyException: zookeeper.DATAINCONSISTENCY,
+    zookeeper.InvalidACLException: zookeeper.INVALIDACL,
+    zookeeper.InvalidCallbackException: zookeeper.INVALIDCALLBACK,
+    zookeeper.InvalidStateException: zookeeper.INVALIDSTATE,
+    zookeeper.MarshallingErrorException: zookeeper.MARSHALLINGERROR,
+    zookeeper.NoAuthException: zookeeper.NOAUTH,
+    zookeeper.NoChildrenForEphemeralsException:
+    zookeeper.NOCHILDRENFOREPHEMERALS,
+    zookeeper.NoNodeException: zookeeper.NONODE,
+    zookeeper.NodeExistsException: zookeeper.NODEEXISTS,
+    zookeeper.NotEmptyException: zookeeper.NOTEMPTY,
+    zookeeper.NothingException: zookeeper.NOTHING,
+    zookeeper.OperationTimeoutException: zookeeper.OPERATIONTIMEOUT,
+    zookeeper.RuntimeInconsistencyException: zookeeper.RUNTIMEINCONSISTENCY,
+    zookeeper.SessionExpiredException: zookeeper.SESSIONEXPIRED,
+    zookeeper.SessionMovedException: zookeeper.SESSIONMOVED,
+    zookeeper.SystemErrorException: zookeeper.SYSTEMERROR,
+    zookeeper.UnimplementedException: zookeeper.UNIMPLEMENTED,
+}
+
 class ZooKeeper:
 
     def __init__(self, connection_string, tree):
@@ -114,6 +186,7 @@
         self.root = tree
         self.sessions = {}
         self.lock = threading.RLock()
+        self.connect_immediately = True
 
     def init(self, addr, watch=None):
         with self.lock:
@@ -121,41 +194,76 @@
             handle = 0
             while handle in self.sessions:
                 handle += 1
-            self.sessions[handle] = set()
-            if watch:
-                watch(handle,
-                      zookeeper.SESSION_EVENT, zookeeper.CONNECTED_STATE, '')
+            self.sessions[handle] = Session(self, handle, watch)
+            if self.connect_immediately:
+                self.sessions[handle].connect()
 
-    def _check_handle(self, handle):
-        with self.lock:
-            if handle not in self.sessions:
-                raise zookeeper.ZooKeeperException('handle out of range')
+    def _check_handle(self, handle, checkstate=True):
+        try:
+            session = self.sessions[handle]
+        except KeyError:
+            raise zookeeper.ZooKeeperException('handle out of range')
+        if checkstate:
+            session.check()
+        return session
 
     def _traverse(self, path):
+        node = self.root
+        for name in path.split('/')[1:]:
+            if not name:
+                continue
+            try:
+                node = node.children[name]
+            except KeyError:
+                raise zookeeper.NoNodeException('no node')
+
+        return node
+
+    def _clear_session(self, session):
         with self.lock:
-            node = self.root
-            for name in path.split('/')[1:]:
-                if not name:
-                    continue
-                try:
-                    node = node.children[name]
-                except KeyError:
-                    raise zookeeper.NoNodeException('no node')
+            for path in list(session.nodes):
+                self._delete(session.handle, path)
+            self.root.clear_watchers(session.handle)
 
-            return node
+    def _doasync(self, completion, handle, nreturn, func, *args):
+        if completion is None:
+            return func(*args)
 
+        if isinstance(nreturn, int):
+            nerror = nreturn
+        else:
+            nreturn, nerror = nreturn
+
+        @zc.thread.Thread
+        def doasync():
+            try:
+                # print 'doasync', func, args
+                with self.lock:
+                    status = 0
+                    try:
+                        r = func(*args)
+                    except Exception, v:
+                        status = exception_codes.get(v.__class__, -1)
+                        r = (None, ) * nerror
+                    if not isinstance(r, tuple):
+                        if nreturn == 1:
+                            r = (r, )
+                        else:
+                            r = ()
+                    completion(*((handle, status) + r))
+            except:
+                traceback.print_exc(file=sys.stdout)
+
+        return 0
+
     def close(self, handle):
         with self.lock:
-            self._check_handle(handle)
-            for path in list(self.sessions[handle]):
-                self.delete(handle, path)
+            self._clear_session(self._check_handle(handle, False))
             del self.sessions[handle]
-            self.root.clear_watchers(handle)
 
     def state(self, handle):
         with self.lock:
-            self._check_handle(handle)
-            return zookeeper.CONNECTED_STATE
+            return self._check_handle(handle, False).state
 
     def create(self, handle, path, data, acl, flags=0):
         with self.lock:
@@ -165,26 +273,43 @@
             if name in node.children:
                 raise zookeeper.NodeExistsException()
             node.children[name] = newnode = Node(data)
-            newnode.acls = acl
+            newnode.acl = acl
             newnode.flags = flags
             node.children_changed(handle, zookeeper.CONNECTED_STATE, base)
             if flags & zookeeper.EPHEMERAL:
                 self.sessions[handle].add(path)
             return path
 
-    def delete(self, handle, path):
+    def acreate(self, handle, path, data, acl, flags=0, completion=None):
+        return self._doasync(completion, handle, 1,
+                            self.create, handle, path, data, acl, flags)
+
+    def _delete(self, handle, path, version=-1):
+        node = self._traverse(path)
+        if version != -1 and node.version != version:
+            raise zookeeper.BadVersionException('bad version')
+        if node.children:
+            raise zookeeper.NotEmptyException('not empty')
+        base, name = path.rsplit('/', 1)
+        bnode = self._traverse(base)
+        del bnode.children[name]
+        node.deleted(handle, zookeeper.CONNECTED_STATE, path)
+        bnode.children_changed(handle, zookeeper.CONNECTED_STATE, base)
+        if path in self.sessions[handle].nodes:
+            self.sessions[handle].remove(path)
+
+    def delete(self, handle, path, version=-1):
         with self.lock:
             self._check_handle(handle)
-            node = self._traverse(path)
-            base, name = path.rsplit('/', 1)
-            bnode = self._traverse(base)
-            del bnode.children[name]
-            node.deleted(handle, zookeeper.CONNECTED_STATE, path)
-            bnode.children_changed(handle, zookeeper.CONNECTED_STATE, base)
-            if path in self.sessions[handle]:
-                self.sessions[handle].remove(path)
+            self._delete(handle, path, version)
 
-    def exists(self, handle, path):
+    def adelete(self, handle, path, version=-1, completion=None):
+        return self._doasync(completion, handle, 0,
+                             self.delete, handle, path, version)
+
+    def exists(self, handle, path, watch=None):
+        if watch is not None:
+            raise TypeError('exists watch not supported')
         with self.lock:
             self._check_handle(handle)
             try:
@@ -193,6 +318,10 @@
             except zookeeper.NoNodeException:
                 return False
 
+    def aexists(self, handle, path, watch=None, completion=None):
+        return self._doasync(completion, handle, 1,
+                             self.exists, handle, path, watch)
+
     def get_children(self, handle, path, watch=None):
         with self.lock:
             self._check_handle(handle)
@@ -201,29 +330,49 @@
                 node.child_watchers += ((handle, watch), )
             return sorted(node.children)
 
+    def aget_children(self, handle, path, watch=None, completion=None):
+        return self._doasync(completion, handle, 1,
+                             self.get_children, handle, path, watch)
+
     def get(self, handle, path, watch=None):
         with self.lock:
             self._check_handle(handle)
             node = self._traverse(path)
             if watch:
                 node.watchers += ((handle, watch), )
-            return node.data, dict(
-                ephemeralOwner=(1 if node.flags & zookeeper.EPHEMERAL else 0),
-                )
+            return node.data, node.meta()
 
-    def set(self, handle, path, data):
+    def aget(self, handle, path, watch=None, completion=None):
+        return self._doasync(completion, handle, 2,
+                             self.get, handle, path, watch)
+
+    def set(self, handle, path, data, version=-1, async=False):
         with self.lock:
             self._check_handle(handle)
             node = self._traverse(path)
+            if version != -1 and node.version != version:
+                raise zookeeper.BadVersionException('bad version')
             node.data = data
             node.changed(handle, zookeeper.CONNECTED_STATE, path)
+            if async:
+                return node.meta()
+            else:
+                return 0
 
+    def aset(self, handle, path, data, version=-1, completion=None):
+        return self._doasync(completion, handle, 1,
+                             self.set, handle, path, data, version, True)
+
     def get_acl(self, handle, path):
         with self.lock:
             self._check_handle(handle)
             node = self._traverse(path)
-            return dict(aversion=node.aversion), node.acl
+            return node.meta(), node.acl
 
+    def aget_acl(self, handle, path, completion=None):
+        return self._doasync(completion, handle,
+                             self.get_acl, handle, path)
+
     def set_acl(self, handle, path, aversion, acl):
         with self.lock:
             self._check_handle(handle)
@@ -233,27 +382,49 @@
             node.aversion += 1
             node.acl = acl
 
+            return 0
+
+    def aset_acl(self, handle, path, aversion, acl, completion=None):
+        return self._doasync(completion, handle, (1, 0),
+                             self.set_acl, handle, path, aversion, acl)
+
 class Node:
     watchers = child_watchers = ()
     flags = 0
-    aversion = 0
+    version = aversion = cversion = 0
     acl = zc.zk.OPEN_ACL_UNSAFE
 
+    def meta(self):
+        return dict(
+            version = self.version,
+            aversion = self.aversion,
+            cversion = self.cversion,
+            ctime = self.ctime,
+            mtime = self.mtime,
+            numChildren = len(self.children),
+            dataLength = len(self.data),
+            ephemeralOwner=(1 if self.flags & zookeeper.EPHEMERAL else 0),
+            )
+
     def __init__(self, data='', **children):
         self.data = data
         self.children = children
+        self.ctime = self.mtime = time.time()
 
     def children_changed(self, handle, state, path):
         watchers = self.child_watchers
         self.child_watchers = ()
         for h, w in watchers:
             w(h, zookeeper.CHILD_EVENT, state, path)
+        self.cversion += 1
 
     def changed(self, handle, state, path):
         watchers = self.watchers
         self.watchers = ()
         for h, w in watchers:
             w(h, zookeeper.CHANGED_EVENT, state, path)
+        self.version += 1
+        self.mtime = time.time()
 
     def deleted(self, handle, state, path):
         watchers = self.watchers

Modified: zc.zk/trunk/src/zc/zk/tests.py
===================================================================
--- zc.zk/trunk/src/zc/zk/tests.py	2011-12-08 17:07:14 UTC (rev 123634)
+++ zc.zk/trunk/src/zc/zk/tests.py	2011-12-08 18:39:41 UTC (rev 123635)
@@ -23,6 +23,7 @@
 import re
 import StringIO
 import sys
+import threading
 import time
 import zc.zk
 import zc.zk.testing
@@ -293,7 +294,7 @@
         self.assertEqual(dict(properties), data)
 
         @side_effect(set)
-        def _(handle, path_, data):
+        def _(handle, path_, data, version=-1):
             self.__set_data = json.loads(data)
             self.assertEqual((handle, path_), (0, path))
 
@@ -336,7 +337,7 @@
         self.assertEqual(dict(properties), dict(string_value='\n{xxx}\n'))
 
         @side_effect(set)
-        def _(handle, path_, data):
+        def _(handle, path_, data, version=-1):
             self.__set_data = data
             self.assertEqual((handle, path_), (0, path))
 
@@ -572,6 +573,7 @@
     >>> zk.set('/test', '{"b": 2}')
     3 {u'b': 2}
     4 zc.zk.Properties(0, /test)
+    0
 
 Hack data into the child watcher to verify it's cleared:
 
@@ -790,17 +792,306 @@
         /providers
     """
 
+def test_recovery_of_servers_on_session_reestablishment():
+    """
+
+First, a basic test:
+
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+    >>> zk.register_server('/fooservice/providers', 'test')
+    >>> zk.get_children('/fooservice/providers')
+    ['test']
+
+    >>> ZooKeeper.sessions[zk.handle].disconnect()
+    >>> ZooKeeper.sessions[zk.handle].expire()
+
+    >>> zk.get_children('/fooservice/providers')
+    ['test']
+
+Now, some variations.
+
+If the node is deleted, we don't recreate it:
+
+    >>> zk.delete('/fooservice/providers/test')
+    >>> ZooKeeper.sessions[zk.handle].disconnect()
+    >>> ZooKeeper.sessions[zk.handle].expire()
+    >>> zk.get_children('/fooservice/providers')
+    []
+
+First, some non-standard data and acl:
+
+    >>> acl = [zc.zk.world_permission(3)]
+    >>> zk.register_server('/fooservice/providers', 'test', acl, a=1)
+    >>> zk.print_tree('/fooservice/providers')
+    /providers
+      /test
+        a = 1
+        pid = 362
+    >>> ZooKeeper.sessions[zk.handle].disconnect()
+    >>> ZooKeeper.sessions[zk.handle].expire()
+    >>> zk.print_tree('/fooservice/providers')
+    /providers
+      /test
+        a = 1
+        pid = 362
+
+    >>> zk.get_acl('/fooservice/providers/test')[1] == acl
+    True
+
+Delete again:
+
+    >>> zk.delete('/fooservice/providers/test')
+    >>> ZooKeeper.sessions[zk.handle].disconnect()
+    >>> ZooKeeper.sessions[zk.handle].expire()
+    >>> zk.get_children('/fooservice/providers')
+    []
+
+Let's use the low-level creation api:
+
+    >>> zk.create('/fooservice/providers/test', 'x', acl, zookeeper.EPHEMERAL)
+    '/fooservice/providers/test'
+
+    >>> ZooKeeper.sessions[zk.handle].disconnect()
+    >>> ZooKeeper.sessions[zk.handle].expire()
+    >>> zk.get_acl('/fooservice/providers/test')[1] == acl
+    True
+    >>> zk.get('/fooservice/providers/test')[0]
+    'x'
+
+We track changes:
+
+    >>> _ = zk.set('/fooservice/providers/test', 'y')
+    >>> acl2 = [zc.zk.world_permission(4)]
+    >>> _ = zk.set_acl('/fooservice/providers/test', 0, acl2)
+    >>> ZooKeeper.sessions[zk.handle].disconnect()
+    >>> ZooKeeper.sessions[zk.handle].expire()
+    >>> zk.get_acl('/fooservice/providers/test')[1] == acl2
+    True
+    >>> zk.get('/fooservice/providers/test')[0]
+    'y'
+
+Delete again:
+
+    >>> zk.delete('/fooservice/providers/test')
+    >>> ZooKeeper.sessions[zk.handle].disconnect()
+    >>> ZooKeeper.sessions[zk.handle].expire()
+    >>> zk.get_children('/fooservice/providers')
+    []
+
+Let's do it all asyncronously :)
+
+    >>> zk.acreate('/fooservice/providers/test', 'x', acl, zookeeper.EPHEMERAL,
+    ...             check_async(0))
+    0
+    >>> event.wait(1); assert_(event.is_set(), error=False)
+    >>> ZooKeeper.sessions[zk.handle].disconnect()
+    >>> ZooKeeper.sessions[zk.handle].expire()
+    >>> zk.get_acl('/fooservice/providers/test')[1] == acl
+    True
+    >>> zk.get('/fooservice/providers/test')[0]
+    'x'
+
+    >>> zk.aset('/fooservice/providers/test', 'y', -1, check_async(0))
+    0
+    >>> event.wait(1); assert_(event.is_set())
+    >>> acl2 = [zc.zk.world_permission(4)]
+    >>> _ = zk.aset_acl('/fooservice/providers/test', 0, acl2, check_async(0))
+    >>> event.wait(1); assert_(event.is_set())
+    >>> ZooKeeper.sessions[zk.handle].disconnect()
+    >>> ZooKeeper.sessions[zk.handle].expire()
+    >>> zk.get_acl('/fooservice/providers/test')[1] == acl2
+    True
+    >>> zk.get('/fooservice/providers/test')[0]
+    'y'
+
+    >>> _ = zk.adelete('/fooservice/providers/test', -1, check_async(0))
+    >>> event.wait(1); assert_(event.is_set())
+    >>> ZooKeeper.sessions[zk.handle].disconnect()
+    >>> ZooKeeper.sessions[zk.handle].expire()
+    >>> zk.get_children('/fooservice/providers')
+    []
+    """
+
+def test_set():
+    """
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+    >>> zk.get('/')[0]
+    ''
+    >>> zk.set('/', 'a'); zk.get('/')[0]
+    0
+    'a'
+
+    >>> zk.set('/', 'b', 0)
+    Traceback (most recent call last):
+    ...
+    BadVersionException: bad version
+
+    >>> zk.set('/', 'b'); zk.get('/')[0]
+    0
+    'b'
+
+    >>> r = zk.aset('/', 'c', -1, check_async()); event.wait(1)
+    ... # doctest: +ELLIPSIS
+    async callback got (...
+    >>> r
+    0
+
+    >>> zk.get('/')[0]
+    'c'
+
+    >>> r = zk.aset('/', 'd', 0,
+    ...             check_async(expected_status=zookeeper.BADVERSION)
+    ...             ); event.wait(1)
+    async callback got (None,)
+
+    >>> r
+    0
+
+    >>> r = zk.aset('/', 'd', 3, check_async()); event.wait(1)
+    ... # doctest: +ELLIPSIS
+    async callback got (...
+    >>> r
+    0
+
+    >>> zk.get('/')[0]
+    'd'
+    """
+
+def test_delete():
+    """
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+    >>> _ = zk.create('/test', '', zc.zk.OPEN_ACL_UNSAFE)
+
+Synchronous variations:
+
+    >>> _ = zk.create('/test/a', '', zc.zk.OPEN_ACL_UNSAFE)
+    >>> _ = zk.set('/test/a', '1')
+    >>> _ = zk.set('/test/a', '2')
+    >>> zk.delete('/test/a', 0)
+    Traceback (most recent call last):
+    ...
+    BadVersionException: bad version
+
+    >>> zk.get_children('/test')
+    ['a']
+    >>> zk.delete('/test/a', 2)
+    >>> zk.get_children('/test')
+    []
+
+    >>> _ = zk.create('/test/a', '', zc.zk.OPEN_ACL_UNSAFE)
+    >>> _ = zk.set('/test/a', '1')
+    >>> _ = zk.set('/test/a', '2')
+    >>> zk.delete('/test/a', -1)
+    >>> zk.get_children('/test')
+    []
+
+    >>> _ = zk.create('/test/a', '', zc.zk.OPEN_ACL_UNSAFE)
+    >>> _ = zk.set('/test/a', '1')
+    >>> _ = zk.set('/test/a', '2')
+    >>> zk.delete('/test/a')
+    >>> zk.get_children('/test')
+    []
+
+
+Asynchronous variations:
+
+    >>> _ = zk.create('/test/a', '', zc.zk.OPEN_ACL_UNSAFE)
+    >>> _ = zk.set('/test/a', '1')
+    >>> _ = zk.set('/test/a', '2')
+    >>> r = zk.adelete('/test/a', 0,
+    ...             check_async(expected_status=zookeeper.BADVERSION)
+    ...             ); event.wait(1)
+    async callback got ()
+    >>> r
+    0
+
+    >>> zk.get_children('/test')
+    ['a']
+    >>> r = zk.adelete('/test/a', 2, check_async()); event.wait(1)
+    async callback got ()
+    >>> r, zk.get_children('/test')
+    (0, [])
+
+    >>> _ = zk.create('/test/a', '', zc.zk.OPEN_ACL_UNSAFE)
+    >>> _ = zk.set('/test/a', '1')
+    >>> _ = zk.set('/test/a', '2')
+    >>> r = zk.adelete('/test/a', -1, check_async()); event.wait(1)
+    async callback got ()
+    >>> r, zk.get_children('/test')
+    (0, [])
+
+    >>> _ = zk.create('/test/a', '', zc.zk.OPEN_ACL_UNSAFE)
+    >>> _ = zk.set('/test/a', '1')
+    >>> _ = zk.set('/test/a', '2')
+    >>> r = zk.adelete('/test/a', completion=check_async()); event.wait(1)
+    async callback got ()
+    >>> r, zk.get_children('/test')
+    (0, [])
+
+    """
+
+def test_set_acl():
+    """
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+    >>> _ = zk.create('/test', '', zc.zk.OPEN_ACL_UNSAFE)
+    >>> zk.get_acl('/test')[1] == zc.zk.OPEN_ACL_UNSAFE
+    True
+    >>> zk.set_acl('/test', 0, [zc.zk.world_permission(1)])
+    0
+    >>> zk.get_acl('/test')[1] == [zc.zk.world_permission(1)]
+    True
+
+    >>> zk.set_acl('/test', 0, [zc.zk.world_permission(1)])
+    Traceback (most recent call last):
+    ...
+    BadVersionException: bad version
+
+    >>> zk.set_acl('/test', 1, [zc.zk.world_permission(2)])
+    0
+    >>> zk.get_acl('/test')[1] == [zc.zk.world_permission(2)]
+    True
+
+    >>> r = zk.aset_acl('/test', 0, [zc.zk.world_permission(3)],
+    ...                 check_async(expected_status=zookeeper.BADVERSION)
+    ...                 ); event.wait(1)
+    async callback got ()
+    >>> r
+    0
+
+    >>> r = zk.aset_acl('/test', 2, [zc.zk.world_permission(3)],
+    ...                 check_async()); event.wait(1)
+    async callback got (0,)
+    >>> r
+    0
+    >>> zk.get_acl('/test')[1] == [zc.zk.world_permission(3)]
+    True
+    """
+
+event = threading.Event()
+def check_async(show=True, expected_status=0):
+    event.clear()
+    def check(handle, status, *args):
+        if show:
+            print 'async callback got', args
+        event.set()
+        zc.zk.testing.assert_(
+            status==expected_status,
+            "Bad cb status %s" % status,
+            error=False)
+    return check
+
 def test_suite():
+    checker = zope.testing.renormalizing.RENormalizing([
+        (re.compile('pid = \d+'), 'pid = 9999')
+        ])
     return unittest.TestSuite((
         unittest.makeSuite(Tests),
         doctest.DocTestSuite(
             setUp=zc.zk.testing.setUp, tearDown=zc.zk.testing.tearDown,
+            checker=checker,
             ),
         manuel.testing.TestSuite(
-            manuel.doctest.Manuel(
-                checker = zope.testing.renormalizing.RENormalizing([
-                    (re.compile('pid = \d+'), 'pid = 9999')
-                    ])) + manuel.capture.Manuel(),
+            manuel.doctest.Manuel(checker=checker) + manuel.capture.Manuel(),
             'README.txt',
             setUp=zc.zk.testing.setUp, tearDown=zc.zk.testing.tearDown,
             ),



More information about the checkins mailing list