[Checkins] SVN: zc.zk/trunk/src/zc/zk/testing.py - Ephemeral nodes weren't deleted when sessions were closed.

Jim Fulton jim at zope.com
Tue Dec 6 22:51:00 UTC 2011


Log message for revision 123612:
  - Ephemeral nodes weren't deleted when sessions were closed.
  - Session watches weren't cleaned up.
  - Watches weren't called in a session-specific manner.
  - Added a reentrant lock on the ZooKeeper emulator to hopefully reduce
    the chance of spurious errors in threaded tests.
  

Changed:
  U   zc.zk/trunk/src/zc/zk/testing.py

-=-
Modified: zc.zk/trunk/src/zc/zk/testing.py
===================================================================
--- zc.zk/trunk/src/zc/zk/testing.py	2011-12-06 21:38:39 UTC (rev 123611)
+++ zc.zk/trunk/src/zc/zk/testing.py	2011-12-06 22:51:00 UTC (rev 123612)
@@ -21,6 +21,7 @@
 """
 import json
 import mock
+import threading
 import time
 import zc.zk
 import zookeeper
@@ -28,7 +29,7 @@
 __all__ = ['assert_', 'setUp', 'tearDown']
 
 def assert_(cond, mess=''):
-    """A simple assertion function for use in tests.
+    """A simple assertion function for use in doctests.
     """
     if not cond:
         print 'assertion failed: ', mess
@@ -93,7 +94,9 @@
         zk.import_tree(tree)
         zk.close()
 
-    getattr(test, 'globs', test.__dict__)['zc.zk.testing'] = teardowns
+    globs = getattr(test, 'globs', test.__dict__)
+    globs['wait_until'] = wait_until
+    globs['zc.zk.testing'] = teardowns
 
 def tearDown(test):
     """The matching tearDown for setUp.
@@ -109,105 +112,126 @@
     def __init__(self, connection_string, tree):
         self.connection_string = connection_string
         self.root = tree
-        self.sessions = set()
+        self.sessions = {}
+        self.lock = threading.RLock()
 
     def init(self, addr, watch=None):
-        assert_(addr==self.connection_string, addr)
-        handle = 0
-        while handle in self.sessions:
-            handle += 1
-        self.sessions.add(handle)
-        if watch:
-            watch(handle,
-                  zookeeper.SESSION_EVENT, zookeeper.CONNECTED_STATE, '')
+        with self.lock:
+            assert_(addr==self.connection_string, addr)
+            handle = 0
+            while handle in self.sessions:
+                handle += 1
+            self.sessions[handle] = set()
+            if watch:
+                watch(handle,
+                      zookeeper.SESSION_EVENT, zookeeper.CONNECTED_STATE, '')
 
     def _check_handle(self, handle):
-        if handle not in self.sessions:
-            raise zookeeper.ZooKeeperException('handle out of range')
+        with self.lock:
+            if handle not in self.sessions:
+                raise zookeeper.ZooKeeperException('handle out of range')
 
     def _traverse(self, path):
-        node = self.root
-        for name in path.split('/')[1:]:
-            if not name:
-                continue
-            try:
-                node = node.children[name]
-            except KeyError:
-                raise zookeeper.NoNodeException('no node')
+        with self.lock:
+            node = self.root
+            for name in path.split('/')[1:]:
+                if not name:
+                    continue
+                try:
+                    node = node.children[name]
+                except KeyError:
+                    raise zookeeper.NoNodeException('no node')
 
-        return node
+            return node
 
     def close(self, handle):
-        self._check_handle(handle)
-        self.sessions.remove(handle)
+        with self.lock:
+            self._check_handle(handle)
+            for path in list(self.sessions[handle]):
+                self.delete(handle, path)
+            del self.sessions[handle]
+            self.root.clear_watchers(handle)
 
     def state(self, handle):
-        self._check_handle(handle)
-        return zookeeper.CONNECTED_STATE
+        with self.lock:
+            self._check_handle(handle)
+            return zookeeper.CONNECTED_STATE
 
     def create(self, handle, path, data, acl, flags=0):
-        self._check_handle(handle)
-        base, name = path.rsplit('/', 1)
-        node = self._traverse(base)
-        if name in node.children:
-            raise zookeeper.NodeExistsException()
-        node.children[name] = newnode = Node(data)
-        newnode.acls = acl
-        newnode.flags = flags
-        node.children_changed(handle, zookeeper.CONNECTED_STATE, base)
-        return path
+        with self.lock:
+            self._check_handle(handle)
+            base, name = path.rsplit('/', 1)
+            node = self._traverse(base)
+            if name in node.children:
+                raise zookeeper.NodeExistsException()
+            node.children[name] = newnode = Node(data)
+            newnode.acls = acl
+            newnode.flags = flags
+            node.children_changed(handle, zookeeper.CONNECTED_STATE, base)
+            if flags & zookeeper.EPHEMERAL:
+                self.sessions[handle].add(path)
+            return path
 
     def delete(self, handle, path):
-        self._check_handle(handle)
-        node = self._traverse(path)
-        base, name = path.rsplit('/', 1)
-        bnode = self._traverse(base)
-        del bnode.children[name]
-        node.deleted(handle, zookeeper.CONNECTED_STATE, path)
-        bnode.children_changed(handle, zookeeper.CONNECTED_STATE, base)
+        with self.lock:
+            self._check_handle(handle)
+            node = self._traverse(path)
+            base, name = path.rsplit('/', 1)
+            bnode = self._traverse(base)
+            del bnode.children[name]
+            node.deleted(handle, zookeeper.CONNECTED_STATE, path)
+            bnode.children_changed(handle, zookeeper.CONNECTED_STATE, base)
+            if path in self.sessions[handle]:
+                self.sessions[handle].remove(path)
 
     def exists(self, handle, path):
-        self._check_handle(handle)
-        try:
-            self._traverse(path)
-            return True
-        except zookeeper.NoNodeException:
-            return False
+        with self.lock:
+            self._check_handle(handle)
+            try:
+                self._traverse(path)
+                return True
+            except zookeeper.NoNodeException:
+                return False
 
     def get_children(self, handle, path, watch=None):
-        self._check_handle(handle)
-        node = self._traverse(path)
-        if watch:
-            node.child_watchers += (watch, )
-        return sorted(node.children)
+        with self.lock:
+            self._check_handle(handle)
+            node = self._traverse(path)
+            if watch:
+                node.child_watchers += ((handle, watch), )
+            return sorted(node.children)
 
     def get(self, handle, path, watch=None):
-        self._check_handle(handle)
-        node = self._traverse(path)
-        if watch:
-            node.watchers += (watch, )
-        return node.data, dict(
-            ephemeralOwner=(1 if node.flags & zookeeper.EPHEMERAL else 0),
-            )
+        with self.lock:
+            self._check_handle(handle)
+            node = self._traverse(path)
+            if watch:
+                node.watchers += ((handle, watch), )
+            return node.data, dict(
+                ephemeralOwner=(1 if node.flags & zookeeper.EPHEMERAL else 0),
+                )
 
     def set(self, handle, path, data):
-        self._check_handle(handle)
-        node = self._traverse(path)
-        node.data = data
-        node.changed(handle, zookeeper.CONNECTED_STATE, path)
+        with self.lock:
+            self._check_handle(handle)
+            node = self._traverse(path)
+            node.data = data
+            node.changed(handle, zookeeper.CONNECTED_STATE, path)
 
     def get_acl(self, handle, path):
-        self._check_handle(handle)
-        node = self._traverse(path)
-        return dict(aversion=node.aversion), node.acl
+        with self.lock:
+            self._check_handle(handle)
+            node = self._traverse(path)
+            return dict(aversion=node.aversion), node.acl
 
     def set_acl(self, handle, path, aversion, acl):
-        self._check_handle(handle)
-        node = self._traverse(path)
-        if aversion != node.aversion:
-            raise zookeeper.BadVersionException("bad version")
-        node.aversion += 1
-        node.acl = acl
+        with self.lock:
+            self._check_handle(handle)
+            node = self._traverse(path)
+            if aversion != node.aversion:
+                raise zookeeper.BadVersionException("bad version")
+            node.aversion += 1
+            node.acl = acl
 
 class Node:
     watchers = child_watchers = ()
@@ -222,21 +246,33 @@
     def children_changed(self, handle, state, path):
         watchers = self.child_watchers
         self.child_watchers = ()
-        for w in watchers:
-            w(handle, zookeeper.CHILD_EVENT, state, path)
+        for h, w in watchers:
+            w(h, zookeeper.CHILD_EVENT, state, path)
 
     def changed(self, handle, state, path):
         watchers = self.watchers
         self.watchers = ()
-        for w in watchers:
-            w(handle, zookeeper.CHANGED_EVENT, state, path)
+        for h, w in watchers:
+            w(h, zookeeper.CHANGED_EVENT, state, path)
 
     def deleted(self, handle, state, path):
         watchers = self.watchers
         self.watchers = ()
-        for w in watchers:
-            w(handle, zookeeper.DELETED_EVENT, state, path)
+        for h, w in watchers:
+            w(h, zookeeper.DELETED_EVENT, state, path)
         watchers = self.child_watchers
         self.watchers = ()
-        for w in watchers:
-            w(handle, zookeeper.DELETED_EVENT, state, path)
+        for h, w in watchers:
+            w(h, zookeeper.DELETED_EVENT, state, path)
+
+    def clear_watchers(self, handle):
+        self.watchers = tuple(
+            (h, w) for (h, w) in self.watchers
+            if h != handle
+            )
+        self.child_watchers = tuple(
+            (h, w) for (h, w) in self.child_watchers
+            if h != handle
+            )
+        for child in self.children.itervalues():
+            child.clear_watchers(handle)



More information about the checkins mailing list