[Checkins] SVN: zc.zk/trunk/src/zc/zk/testing.py - Ephemeral nodes weren't deleted when sessions were closed.
Jim Fulton
jim at zope.com
Tue Dec 6 22:51:00 UTC 2011
Log message for revision 123612:
- Ephemeral nodes weren't deleted when sessions were closed.
- Session watches weren't cleaned up.
- Watches weren't called in a session-specific manner.
- Added a reentrant lock on the ZooKeeper emulator to hopefully reduce
the chance of spurious errors in threaded tests.
Changed:
U zc.zk/trunk/src/zc/zk/testing.py
-=-
Modified: zc.zk/trunk/src/zc/zk/testing.py
===================================================================
--- zc.zk/trunk/src/zc/zk/testing.py 2011-12-06 21:38:39 UTC (rev 123611)
+++ zc.zk/trunk/src/zc/zk/testing.py 2011-12-06 22:51:00 UTC (rev 123612)
@@ -21,6 +21,7 @@
"""
import json
import mock
+import threading
import time
import zc.zk
import zookeeper
@@ -28,7 +29,7 @@
__all__ = ['assert_', 'setUp', 'tearDown']
def assert_(cond, mess=''):
- """A simple assertion function for use in tests.
+ """A simple assertion function for use in doctests.
"""
if not cond:
print 'assertion failed: ', mess
@@ -93,7 +94,9 @@
zk.import_tree(tree)
zk.close()
- getattr(test, 'globs', test.__dict__)['zc.zk.testing'] = teardowns
+ globs = getattr(test, 'globs', test.__dict__)
+ globs['wait_until'] = wait_until
+ globs['zc.zk.testing'] = teardowns
def tearDown(test):
"""The matching tearDown for setUp.
@@ -109,105 +112,126 @@
def __init__(self, connection_string, tree):
self.connection_string = connection_string
self.root = tree
- self.sessions = set()
+ self.sessions = {}
+ self.lock = threading.RLock()
def init(self, addr, watch=None):
- assert_(addr==self.connection_string, addr)
- handle = 0
- while handle in self.sessions:
- handle += 1
- self.sessions.add(handle)
- if watch:
- watch(handle,
- zookeeper.SESSION_EVENT, zookeeper.CONNECTED_STATE, '')
+ with self.lock:
+ assert_(addr==self.connection_string, addr)
+ handle = 0
+ while handle in self.sessions:
+ handle += 1
+ self.sessions[handle] = set()
+ if watch:
+ watch(handle,
+ zookeeper.SESSION_EVENT, zookeeper.CONNECTED_STATE, '')
def _check_handle(self, handle):
- if handle not in self.sessions:
- raise zookeeper.ZooKeeperException('handle out of range')
+ with self.lock:
+ if handle not in self.sessions:
+ raise zookeeper.ZooKeeperException('handle out of range')
def _traverse(self, path):
- node = self.root
- for name in path.split('/')[1:]:
- if not name:
- continue
- try:
- node = node.children[name]
- except KeyError:
- raise zookeeper.NoNodeException('no node')
+ with self.lock:
+ node = self.root
+ for name in path.split('/')[1:]:
+ if not name:
+ continue
+ try:
+ node = node.children[name]
+ except KeyError:
+ raise zookeeper.NoNodeException('no node')
- return node
+ return node
def close(self, handle):
- self._check_handle(handle)
- self.sessions.remove(handle)
+ with self.lock:
+ self._check_handle(handle)
+ for path in list(self.sessions[handle]):
+ self.delete(handle, path)
+ del self.sessions[handle]
+ self.root.clear_watchers(handle)
def state(self, handle):
- self._check_handle(handle)
- return zookeeper.CONNECTED_STATE
+ with self.lock:
+ self._check_handle(handle)
+ return zookeeper.CONNECTED_STATE
def create(self, handle, path, data, acl, flags=0):
- self._check_handle(handle)
- base, name = path.rsplit('/', 1)
- node = self._traverse(base)
- if name in node.children:
- raise zookeeper.NodeExistsException()
- node.children[name] = newnode = Node(data)
- newnode.acls = acl
- newnode.flags = flags
- node.children_changed(handle, zookeeper.CONNECTED_STATE, base)
- return path
+ with self.lock:
+ self._check_handle(handle)
+ base, name = path.rsplit('/', 1)
+ node = self._traverse(base)
+ if name in node.children:
+ raise zookeeper.NodeExistsException()
+ node.children[name] = newnode = Node(data)
+ newnode.acls = acl
+ newnode.flags = flags
+ node.children_changed(handle, zookeeper.CONNECTED_STATE, base)
+ if flags & zookeeper.EPHEMERAL:
+ self.sessions[handle].add(path)
+ return path
def delete(self, handle, path):
- self._check_handle(handle)
- node = self._traverse(path)
- base, name = path.rsplit('/', 1)
- bnode = self._traverse(base)
- del bnode.children[name]
- node.deleted(handle, zookeeper.CONNECTED_STATE, path)
- bnode.children_changed(handle, zookeeper.CONNECTED_STATE, base)
+ with self.lock:
+ self._check_handle(handle)
+ node = self._traverse(path)
+ base, name = path.rsplit('/', 1)
+ bnode = self._traverse(base)
+ del bnode.children[name]
+ node.deleted(handle, zookeeper.CONNECTED_STATE, path)
+ bnode.children_changed(handle, zookeeper.CONNECTED_STATE, base)
+ if path in self.sessions[handle]:
+ self.sessions[handle].remove(path)
def exists(self, handle, path):
- self._check_handle(handle)
- try:
- self._traverse(path)
- return True
- except zookeeper.NoNodeException:
- return False
+ with self.lock:
+ self._check_handle(handle)
+ try:
+ self._traverse(path)
+ return True
+ except zookeeper.NoNodeException:
+ return False
def get_children(self, handle, path, watch=None):
- self._check_handle(handle)
- node = self._traverse(path)
- if watch:
- node.child_watchers += (watch, )
- return sorted(node.children)
+ with self.lock:
+ self._check_handle(handle)
+ node = self._traverse(path)
+ if watch:
+ node.child_watchers += ((handle, watch), )
+ return sorted(node.children)
def get(self, handle, path, watch=None):
- self._check_handle(handle)
- node = self._traverse(path)
- if watch:
- node.watchers += (watch, )
- return node.data, dict(
- ephemeralOwner=(1 if node.flags & zookeeper.EPHEMERAL else 0),
- )
+ with self.lock:
+ self._check_handle(handle)
+ node = self._traverse(path)
+ if watch:
+ node.watchers += ((handle, watch), )
+ return node.data, dict(
+ ephemeralOwner=(1 if node.flags & zookeeper.EPHEMERAL else 0),
+ )
def set(self, handle, path, data):
- self._check_handle(handle)
- node = self._traverse(path)
- node.data = data
- node.changed(handle, zookeeper.CONNECTED_STATE, path)
+ with self.lock:
+ self._check_handle(handle)
+ node = self._traverse(path)
+ node.data = data
+ node.changed(handle, zookeeper.CONNECTED_STATE, path)
def get_acl(self, handle, path):
- self._check_handle(handle)
- node = self._traverse(path)
- return dict(aversion=node.aversion), node.acl
+ with self.lock:
+ self._check_handle(handle)
+ node = self._traverse(path)
+ return dict(aversion=node.aversion), node.acl
def set_acl(self, handle, path, aversion, acl):
- self._check_handle(handle)
- node = self._traverse(path)
- if aversion != node.aversion:
- raise zookeeper.BadVersionException("bad version")
- node.aversion += 1
- node.acl = acl
+ with self.lock:
+ self._check_handle(handle)
+ node = self._traverse(path)
+ if aversion != node.aversion:
+ raise zookeeper.BadVersionException("bad version")
+ node.aversion += 1
+ node.acl = acl
class Node:
watchers = child_watchers = ()
@@ -222,21 +246,33 @@
def children_changed(self, handle, state, path):
watchers = self.child_watchers
self.child_watchers = ()
- for w in watchers:
- w(handle, zookeeper.CHILD_EVENT, state, path)
+ for h, w in watchers:
+ w(h, zookeeper.CHILD_EVENT, state, path)
def changed(self, handle, state, path):
watchers = self.watchers
self.watchers = ()
- for w in watchers:
- w(handle, zookeeper.CHANGED_EVENT, state, path)
+ for h, w in watchers:
+ w(h, zookeeper.CHANGED_EVENT, state, path)
def deleted(self, handle, state, path):
watchers = self.watchers
self.watchers = ()
- for w in watchers:
- w(handle, zookeeper.DELETED_EVENT, state, path)
+ for h, w in watchers:
+ w(h, zookeeper.DELETED_EVENT, state, path)
watchers = self.child_watchers
self.watchers = ()
- for w in watchers:
- w(handle, zookeeper.DELETED_EVENT, state, path)
+ for h, w in watchers:
+ w(h, zookeeper.DELETED_EVENT, state, path)
+
+ def clear_watchers(self, handle):
+ self.watchers = tuple(
+ (h, w) for (h, w) in self.watchers
+ if h != handle
+ )
+ self.child_watchers = tuple(
+ (h, w) for (h, w) in self.child_watchers
+ if h != handle
+ )
+ for child in self.children.itervalues():
+ child.clear_watchers(handle)
More information about the checkins
mailing list