[Checkins] SVN: zc.zk/trunk/src/zc/zk/ Cleaned up handling of failed connections.

Jim Fulton jim at zope.com
Fri Dec 9 20:32:19 UTC 2011


Log message for revision 123651:
  Cleaned up handling of failed connections.
  

Changed:
  U   zc.zk/trunk/src/zc/zk/README.txt
  U   zc.zk/trunk/src/zc/zk/__init__.py
  U   zc.zk/trunk/src/zc/zk/testing.py
  U   zc.zk/trunk/src/zc/zk/tests.py
  A   zc.zk/trunk/src/zc/zk/wait_for_zookeeper.test

-=-
Modified: zc.zk/trunk/src/zc/zk/README.txt
===================================================================
--- zc.zk/trunk/src/zc/zk/README.txt	2011-12-09 14:46:56 UTC (rev 123650)
+++ zc.zk/trunk/src/zc/zk/README.txt	2011-12-09 20:32:18 UTC (rev 123651)
@@ -604,9 +604,25 @@
 zc.zk.ZooKeeper
 ---------------
 
-``zc.zk.ZooKeeper(connection_string)``
+``zc.zk.ZooKeeper([connection_string[, session_timeout[, timeout[, wait]]]])``
     Return a new instance given a ZooKeeper connection string.
 
+    The connection string defaults to '127.0.0.1:2181'.
+
+    If a session timeout (``session_timeout``) isn't specified, the
+    ZooKeeper server's default session timeout is used.  If the
+    connection to ZooKeeper flaps, setting this to a higher value can
+    avoid having clients think a server has gone away, when it hasn't.
+    The downside of setting this to a higher value is that if a server
+    crashes, it will take longer for ZooKeeper to notice that it's
+    gone.
+
+    The ``wait`` flag indicates whether the constructor should wait
+    for a connection to ZooKeeper.  It defaults to False.
+
+    If a connection can't be made, a ``zc.zk.FailedConnect`` exception
+    is raised.
+
 ``children(path)``
    Return a `zc.zk.Children`_ for the path.
 
@@ -809,6 +825,8 @@
   ``parse_tree``, to support analysis and added a ``graphvis``
   demonstration module to show how one might use tree representations
   for system modeling.
+- Added a `zc.zc.ZooKeeper`_ ``wait`` constructor argument to wait
+  indefinately for ZooKeeper to be available.
 
 0.2.0 (2011-12-05)
 ------------------

Modified: zc.zk/trunk/src/zc/zk/__init__.py
===================================================================
--- zc.zk/trunk/src/zc/zk/__init__.py	2011-12-09 14:46:56 UTC (rev 123650)
+++ zc.zk/trunk/src/zc/zk/__init__.py	2011-12-09 20:32:18 UTC (rev 123651)
@@ -119,54 +119,64 @@
 
 class ZooKeeper:
 
-    def __init__(self, zkaddr=2181, zktimeout=None, timeout=1):
-        if isinstance(zkaddr, int):
-            zkaddr = "127.0.0.1:%s" % zkaddr
-        self.timeout = timeout
-        self.zkaddr = zkaddr
+    def __init__(self, connection_string="127.0.0.1:2181", session_timeout=None,
+                 wait=False):
         self.watches = WatchManager()
         self.ephemeral = {}
-        self.connected = threading.Event()
-        if zktimeout:
-            handle = zookeeper.init(zkaddr, self._watch_session, zktimeout)
-        else:
-            handle = zookeeper.init(zkaddr, self._watch_session)
-        self.connected.wait(timeout)
-        if not self.connected.is_set():
-            zookeeper.close(handle)
-            raise FailedConnect(zkaddr)
+        self.handle = None
 
-    handle = None
-    def _watch_session(self, handle, event_type, state, path):
-        assert event_type == zookeeper.SESSION_EVENT
-        assert not path
-        if state == zookeeper.CONNECTED_STATE:
-            if self.handle is None:
-                self.handle = handle
-                for watch in self.watches.clear():
-                    self._watch(watch, False)
-                for path, data in self.ephemeral.items():
-                    zookeeper.create(self.handle, path, data['data'],
-                                     data['acl'], data['flags'])
+        connected = self.connected = threading.Event()
+        def watch_session(handle, event_type, state, path):
+            assert event_type == zookeeper.SESSION_EVENT
+            assert not path
+            if state == zookeeper.CONNECTED_STATE:
+                if self.handle is None:
+                    self.handle = handle
+                    for watch in self.watches.clear():
+                        self._watch(watch)
+                    for path, data in self.ephemeral.items():
+                        zookeeper.create(self.handle, path, data['data'],
+                                         data['acl'], data['flags'])
+                else:
+                    assert handle == self.handle
+                connected.set()
+                logger.info('connected %s', handle)
+            elif state == zookeeper.CONNECTING_STATE:
+                connected.clear()
+            elif state == zookeeper.EXPIRED_SESSION_STATE:
+                connected.clear()
+                if self.handle is not None:
+                    zookeeper.close(self.handle)
+                self.handle = None
+                init()
             else:
-                assert handle == self.handle
-            self.connected.set()
-            logger.info('connected %s', handle)
-        elif state == zookeeper.CONNECTING_STATE:
-            self.connected.clear()
-        elif state == zookeeper.EXPIRED_SESSION_STATE:
-            self.connected.clear()
-            zookeeper.close(self.handle)
-            self.handle = None
-            zookeeper.init(self.zkaddr, self._watch_session)
+                logger.critical('unexpected session event %s %s', handle, state)
+
+        if session_timeout:
+            init = (lambda : zookeeper.init(connection_string, watch_session,
+                                            session_timeout)
+                    )
         else:
-            logger.critical('unexpected session event %s %s', handle, state)
+            init = lambda : zookeeper.init(connection_string, watch_session)
 
+        handle = init()
+        connected.wait(1)
+        if not connected.is_set():
+            if wait:
+                while not connected.is_set():
+                    print 'whimper'
+                    logger.critical("Can't connect to ZooKeeper at %r",
+                                    connection_string)
+                    connected.wait(1)
+            else:
+                zookeeper.close(handle)
+                raise FailedConnect(connection_string)
+
+
     def register_server(self, path, addr, acl=READ_ACL_UNSAFE, **kw):
         kw['pid'] = os.getpid()
         if not isinstance(addr, str):
             addr = '%s:%s' % addr
-        self.connected.wait(self.timeout)
         path = self.resolve(path)
         zc.zk.event.notify(RegisteringServer(addr, path, kw))
         self.create(path + '/' + addr, encode(kw), acl, zookeeper.EPHEMERAL)
@@ -220,10 +230,8 @@
         if path in self.ephemeral:
             self.ephemeral[path]['acl'] = acl
 
-    def _watch(self, watch, wait=True):
+    def _watch(self, watch):
         event_type = watch.event_type
-        if wait:
-            self.connected.wait(self.timeout)
         watch.real_path = real_path = self.resolve(watch.path)
         key = event_type, real_path
         if self.watches.add(key, watch):
@@ -294,7 +302,7 @@
                 logger.exception("%s path went away", watch)
                 watch._deleted()
             else:
-                self._watch(watch, False)
+                self._watch(watch)
 
     def children(self, path):
         return Children(self, path)
@@ -312,7 +320,6 @@
         self._import_tree(path, parse_tree(text), acl, trim, dry_run, True)
 
     def _import_tree(self, path, node, acl, trim, dry_run, top=False):
-        self.connected.wait(self.timeout)
         if not top:
             new_children = set(node.children)
             for name in self.get_children(path):
@@ -390,7 +397,6 @@
     def export_tree(self, path='/', ephemeral=False, name=None):
         output = []
         out = output.append
-        self.connected.wait(self.timeout)
 
         def export_tree(path, indent, name=None):
             children = self.get_children(path)
@@ -455,7 +461,6 @@
             raise zookeeper.NoNodeException(path)
 
     def _set(self, path, data):
-        self.connected.wait(self.timeout)
         return self.set(path, data)
 
 
@@ -469,7 +474,7 @@
 
     def close(self):
         zookeeper.close(self.handle)
-        del self.handle
+        self.handle = None
 
     @property
     def state(self):

Modified: zc.zk/trunk/src/zc/zk/testing.py
===================================================================
--- zc.zk/trunk/src/zc/zk/testing.py	2011-12-09 14:46:56 UTC (rev 123650)
+++ zc.zk/trunk/src/zc/zk/testing.py	2011-12-09 20:32:18 UTC (rev 123651)
@@ -112,6 +112,10 @@
     shown in your tests. In particularm ``zookeeper.create`` returns
     the path created and the string returned is real, not virtual.
     This node is cleaned up by the ``tearDown``.
+
+    A doctest can determine if it's running with a stub ZooKeeper by
+    checking whether the value of the ZooKeeper gloval variable is None.
+    A regular unit test can check the ZooKeeper test attribute.
     """
 
     globs = getattr(test, 'globs', test.__dict__)
@@ -126,10 +130,11 @@
         orig_init = zookeeper.init
         cm = mock.patch('zookeeper.init')
         m = cm.__enter__()
-        def init(addr, watch=None):
-            assert_(addr==connection_string,
-                    "%r != %r" % (addr, connection_string))
-            return orig_init(real_zk+test_root, watch, 1000)
+        def init(addr, watch=None, session_timeout=1000):
+            if addr != connection_string:
+                return orig_init(addr, watch, session_timeout)
+            else:
+                return orig_init(real_zk+test_root, watch, session_timeout)
         m.side_effect = init
         teardowns.append(cm.__exit__)
 
@@ -192,7 +197,7 @@
 
 class Session:
 
-    def __init__(self, zk, handle, watch=None):
+    def __init__(self, zk, handle, watch=None, session_timeout=None):
         self.zk = zk
         self.handle = handle
         self.nodes = set()
@@ -200,6 +205,7 @@
         self.remove = self.nodes.remove
         self.watch = watch
         self.state = zookeeper.CONNECTING_STATE
+        self.session_timeout = session_timeout
 
     def connect(self):
         self.newstate(zookeeper.CONNECTED_STATE)
@@ -254,22 +260,31 @@
 class ZooKeeper:
 
     def __init__(self, connection_string, tree):
-        self.connection_string = connection_string
+        self.connection_strings = set([connection_string])
         self.root = tree
         self.sessions = {}
         self.lock = threading.RLock()
-        self.connect_immediately = True
+        self.failed = {}
 
-    def init(self, addr, watch=None):
+    def init(self, addr, watch=None, session_timeout=4000):
         with self.lock:
-            assert_(addr==self.connection_string, addr)
             handle = 0
             while handle in self.sessions:
                 handle += 1
-            self.sessions[handle] = Session(self, handle, watch)
-            if self.connect_immediately:
+            self.sessions[handle] = Session(
+                self, handle, watch, session_timeout)
+            if addr in self.connection_strings:
                 self.sessions[handle].connect()
+            else:
+                self.failed.setdefault(addr, set()).add(handle)
+            return handle
 
+    def _allow_connection(self, connection_string):
+        self.connection_strings.add(connection_string)
+        for handle in self.failed.pop(connection_string, ()):
+            if handle in self.sessions:
+                self.sessions[handle].connect()
+
     def _check_handle(self, handle, checkstate=True):
         try:
             session = self.sessions[handle]
@@ -419,6 +434,10 @@
         return self._doasync(completion, handle, 2,
                              self.get, handle, path, watch)
 
+    def recv_timeout(self, handle):
+        with self.lock:
+            return self._check_handle(handle, False).session_timeout
+
     def set(self, handle, path, data, version=-1, async=False):
         with self.lock:
             self._check_handle(handle)

Modified: zc.zk/trunk/src/zc/zk/tests.py
===================================================================
--- zc.zk/trunk/src/zc/zk/tests.py	2011-12-09 14:46:56 UTC (rev 123650)
+++ zc.zk/trunk/src/zc/zk/tests.py	2011-12-09 20:32:18 UTC (rev 123651)
@@ -1006,6 +1006,24 @@
     True
     """
 
+def connection_edge_cases():
+    """
+We can pass a session timeout, and it will be passed to ZooKeeper:
+
+    >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181', 4242)
+    >>> zk.recv_timeout()
+    4242
+    >>> zk.close()
+
+Connecting to an invalid address caused a FailedConnect to be raised:
+
+    >>> zc.zk.ZooKeeper('192.0.2.42:2181')
+    Traceback (most recent call last):
+    ...
+    FailedConnect: 192.0.2.42:2181
+
+    """
+
 event = threading.Event()
 def check_async(show=True, expected_status=0):
     event.clear()
@@ -1047,6 +1065,7 @@
         suite.addTest(unittest.makeSuite(LoggingTests))
         suite.addTest(doctest.DocFileSuite(
             'ephemeral_node_recovery_on_session_reestablishment.test',
+            'wait_for_zookeeper.test',
             setUp=setUpEphemeral_node_recovery_on_session_reestablishment,
             tearDown=zc.zk.testing.tearDown,
             checker=checker,

Added: zc.zk/trunk/src/zc/zk/wait_for_zookeeper.test
===================================================================
--- zc.zk/trunk/src/zc/zk/wait_for_zookeeper.test	                        (rev 0)
+++ zc.zk/trunk/src/zc/zk/wait_for_zookeeper.test	2011-12-09 20:32:18 UTC (rev 123651)
@@ -0,0 +1,41 @@
+Normally, zc.zk.ZooKeeper raises an exception if it can't connect to
+ZooKeeper in a second.  Some applications might want to wait, so
+zc.zk.ZooKeeper accepts a wait parameter that causes it to wait for a
+connection.
+
+    >>> zk = None
+    >>> import zc.zk, zc.thread, zookeeper, zope.testing.loggingsupport
+
+    >>> handler = zope.testing.loggingsupport.InstalledHandler('zc.zk')
+
+    >>> @zc.thread.Thread
+    ... def connect():
+    ...     global zk
+    ...     zk = zc.zk.ZooKeeper('Invalid', wait=True)
+
+We'll wait a while while it tries in vane to connect:
+
+    >>> wait_until((lambda : zk is not None), 4)
+    Traceback (most recent call last):
+    ...
+    AssertionError: timeout
+
+    >>> print handler # doctest: +ELLIPSIS
+    zc.zk CRITICAL
+      Can't connect to ZooKeeper at 'Invalid'
+    zc.zk CRITICAL
+      Can't connect to ZooKeeper at 'Invalid'
+    ...
+    >>> handler.uninstall()
+
+Now, we'll make the connection possible:
+
+    >>> ZooKeeper._allow_connection('Invalid')
+    >>> wait_until(lambda : zk is not None)
+
+    >>> zk.state == zookeeper.CONNECTED_STATE
+    True
+
+Yay!
+
+    >>> zk.close()


Property changes on: zc.zk/trunk/src/zc/zk/wait_for_zookeeper.test
___________________________________________________________________
Added: svn:eol-style
   + native



More information about the checkins mailing list