[Checkins] SVN: zc.zk/trunk/src/zc/zk/ ZooKeeper node data and child watchers are called on session

Jim Fulton jim at zope.com
Fri Jan 6 21:06:27 UTC 2012


Log message for revision 123974:
  ZooKeeper node data and child watchers are called on session
  expiry.  This was unexpected.  The data and child handler
  functions now handle these events more gracefully.
  

Changed:
  U   zc.zk/trunk/src/zc/zk/README.txt
  U   zc.zk/trunk/src/zc/zk/__init__.py
  U   zc.zk/trunk/src/zc/zk/disconnectiontests.py
  U   zc.zk/trunk/src/zc/zk/testing.py

-=-
Modified: zc.zk/trunk/src/zc/zk/README.txt
===================================================================
--- zc.zk/trunk/src/zc/zk/README.txt	2012-01-06 19:41:06 UTC (rev 123973)
+++ zc.zk/trunk/src/zc/zk/README.txt	2012-01-06 21:06:27 UTC (rev 123974)
@@ -989,6 +989,15 @@
 Change History
 ==============
 
+0.5.1 (2012-01-06)
+------------------
+
+- Fixed bug:
+
+  - ZooKeeper node data and child watchers are called on session
+    expiry.  This was unexpected.  The data and child handler
+    functions now handle these events more gracefully.
+
 0.5.1 (2012-01-04)
 ------------------
 

Modified: zc.zk/trunk/src/zc/zk/__init__.py
===================================================================
--- zc.zk/trunk/src/zc/zk/__init__.py	2012-01-06 19:41:06 UTC (rev 123973)
+++ zc.zk/trunk/src/zc/zk/__init__.py	2012-01-06 21:06:27 UTC (rev 123974)
@@ -304,6 +304,15 @@
         zkfunc = getattr(zookeeper, self.__zkfuncs[event_type])
 
         def handler(h, t, state, p, reraise=False):
+
+            if state != zookeeper.CONNECTED_STATE:
+                # This can happen if we get disconnected or a session expires.
+                # When we reconnect, we should restablish the watchers.
+                logger.warning(
+                    "Node watcher event %r with non-connected state, %r",
+                    t, state)
+                return
+
             try:
                 assert h == self.handle
                 assert state == zookeeper.CONNECTED_STATE

Modified: zc.zk/trunk/src/zc/zk/disconnectiontests.py
===================================================================
--- zc.zk/trunk/src/zc/zk/disconnectiontests.py	2012-01-06 19:41:06 UTC (rev 123973)
+++ zc.zk/trunk/src/zc/zk/disconnectiontests.py	2012-01-06 21:06:27 UTC (rev 123974)
@@ -18,6 +18,11 @@
 # server, which we can't control (or at least don't want to work hard
 # enough to control).
 
+from pprint import pprint
+import zc.zk
+import zookeeper
+import zope.testing.loggingsupport
+
 def wait_for_zookeeper():
     """
     Normally, zc.zk.ZooKeeper raises an exception if it can't connect to
@@ -26,7 +31,7 @@
     connection.
 
     >>> zk = None
-    >>> import zc.zk, zc.thread, zookeeper, zope.testing.loggingsupport
+    >>> import zc.thread
 
     >>> handler = zope.testing.loggingsupport.InstalledHandler('zc.zk')
 
@@ -63,3 +68,67 @@
     >>> zk.close()
     """
 
+def settion_timeout_with_child_and_data_watchers():
+    """
+
+Set up a session with some watchers:
+
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181')
+
+    >>> handler = zope.testing.loggingsupport.InstalledHandler('zc.zk')
+
+    >>> properties = zk.properties('/fooservice')
+    >>> @properties
+    ... def changed(a):
+    ...     print 'properties changed', a is properties
+    properties changed True
+
+    >>> pprint(dict(properties), width=60)
+    {u'database': u'/databases/foomain',
+     u'favorite_color': u'red',
+     u'threads': 1}
+
+    >>> children = zk.children('/fooservice')
+    >>> @children
+    ... def changed(a):
+    ...     print 'children changed', a is children
+    children changed True
+
+    >>> sorted(children)
+    ['providers']
+
+Now, we'll expire the session:
+
+    >>> handler.clear()
+    >>> ZooKeeper.sessions[zk.handle].disconnect()
+    >>> ZooKeeper.sessions[zk.handle].expire()
+    children changed True
+    properties changed True
+
+(Note that we got the handlers called when we reestablished the new
+ session.  This is important as the data may have changed between the
+ old and new session.)
+
+Now, if we make changes, they'll be properly reflected:
+
+    >>> _ = zk.set('/fooservice', '{"a": 1}')
+    properties changed True
+
+    >>> dict(properties)
+    {u'a': 1}
+
+    >>> zk.register_server('/fooservice', 'x')
+    children changed True
+
+    >>> sorted(children)
+    ['providers', 'x']
+
+    >>> print handler
+    zc.zk WARNING
+      Node watcher event -1 with non-connected state, -112
+    zc.zk WARNING
+      Node watcher event -1 with non-connected state, -112
+    zc.zk INFO
+      connected 0
+
+    """

Modified: zc.zk/trunk/src/zc/zk/testing.py
===================================================================
--- zc.zk/trunk/src/zc/zk/testing.py	2012-01-06 19:41:06 UTC (rev 123973)
+++ zc.zk/trunk/src/zc/zk/testing.py	2012-01-06 21:06:27 UTC (rev 123974)
@@ -201,7 +201,7 @@
     def __init__(self, zk, handle, watch=None, session_timeout=None):
         self.zk = zk
         self.handle = handle
-        self.nodes = set()
+        self.nodes = set() # ephemeral nodes
         self.add = self.nodes.add
         self.remove = self.nodes.remove
         self.watch = watch
@@ -215,7 +215,8 @@
         self.newstate(zookeeper.CONNECTING_STATE)
 
     def expire(self):
-        self.zk._clear_session(self)
+        self.zk._clear_session(
+            self, zookeeper.SESSION_EVENT, zookeeper.EXPIRED_SESSION_STATE)
         self.newstate(zookeeper.EXPIRED_SESSION_STATE)
 
     def newstate(self, state):
@@ -309,9 +310,9 @@
 
         return node
 
-    def _clear_session(self, session):
+    def _clear_session(self, session, event=None, state=None):
         with self.lock:
-            self.root.clear_watchers(session.handle)
+            self.root.clear_watchers(session.handle, event, state)
             for path in list(session.nodes):
                 self._delete(session.handle, path)
 
@@ -535,7 +536,15 @@
         for h, w in watchers:
             w(h, zookeeper.DELETED_EVENT, state, path)
 
-    def clear_watchers(self, handle):
+    def clear_watchers(self, handle, event, state, path='/'):
+        if state is not None:
+            for (h, w) in self.watchers:
+                if h == handle:
+                    w(h, event, state, path)
+            for (h, w) in self.child_watchers:
+                if h == handle:
+                    w(h, event, state, path)
+
         self.watchers = tuple(
             (h, w) for (h, w) in self.watchers
             if h != handle
@@ -544,5 +553,5 @@
             (h, w) for (h, w) in self.child_watchers
             if h != handle
             )
-        for child in self.children.itervalues():
-            child.clear_watchers(handle)
+        for name, child in self.children.items():
+            child.clear_watchers(handle, event, state, path + '/' + name)



More information about the checkins mailing list