[Checkins] SVN: zc.zk/trunk/src/zc/zk/ Cleaned up handling of failed connections.
Jim Fulton
jim at zope.com
Fri Dec 9 20:32:19 UTC 2011
Log message for revision 123651:
Cleaned up handling of failed connections.
Changed:
U zc.zk/trunk/src/zc/zk/README.txt
U zc.zk/trunk/src/zc/zk/__init__.py
U zc.zk/trunk/src/zc/zk/testing.py
U zc.zk/trunk/src/zc/zk/tests.py
A zc.zk/trunk/src/zc/zk/wait_for_zookeeper.test
-=-
Modified: zc.zk/trunk/src/zc/zk/README.txt
===================================================================
--- zc.zk/trunk/src/zc/zk/README.txt 2011-12-09 14:46:56 UTC (rev 123650)
+++ zc.zk/trunk/src/zc/zk/README.txt 2011-12-09 20:32:18 UTC (rev 123651)
@@ -604,9 +604,25 @@
zc.zk.ZooKeeper
---------------
-``zc.zk.ZooKeeper(connection_string)``
+``zc.zk.ZooKeeper([connection_string[, session_timeout[, timeout[, wait]]]])``
Return a new instance given a ZooKeeper connection string.
+ The connection string defaults to '127.0.0.1:2181'.
+
+ If a session timeout (``session_timeout``) isn't specified, the
+ ZooKeeper server's default session timeout is used. If the
+ connection to ZooKeeper flaps, setting this to a higher value can
+ avoid having clients think a server has gone away, when it hasn't.
+ The downside of setting this to a higher value is that if a server
+ crashes, it will take longer for ZooKeeper to notice that it's
+ gone.
+
+ The ``wait`` flag indicates whether the constructor should wait
+ for a connection to ZooKeeper. It defaults to False.
+
+ If a connection can't be made, a ``zc.zk.FailedConnect`` exception
+ is raised.
+
``children(path)``
Return a `zc.zk.Children`_ for the path.
@@ -809,6 +825,8 @@
``parse_tree``, to support analysis and added a ``graphvis``
demonstration module to show how one might use tree representations
for system modeling.
+- Added a `zc.zc.ZooKeeper`_ ``wait`` constructor argument to wait
+ indefinately for ZooKeeper to be available.
0.2.0 (2011-12-05)
------------------
Modified: zc.zk/trunk/src/zc/zk/__init__.py
===================================================================
--- zc.zk/trunk/src/zc/zk/__init__.py 2011-12-09 14:46:56 UTC (rev 123650)
+++ zc.zk/trunk/src/zc/zk/__init__.py 2011-12-09 20:32:18 UTC (rev 123651)
@@ -119,54 +119,64 @@
class ZooKeeper:
- def __init__(self, zkaddr=2181, zktimeout=None, timeout=1):
- if isinstance(zkaddr, int):
- zkaddr = "127.0.0.1:%s" % zkaddr
- self.timeout = timeout
- self.zkaddr = zkaddr
+ def __init__(self, connection_string="127.0.0.1:2181", session_timeout=None,
+ wait=False):
self.watches = WatchManager()
self.ephemeral = {}
- self.connected = threading.Event()
- if zktimeout:
- handle = zookeeper.init(zkaddr, self._watch_session, zktimeout)
- else:
- handle = zookeeper.init(zkaddr, self._watch_session)
- self.connected.wait(timeout)
- if not self.connected.is_set():
- zookeeper.close(handle)
- raise FailedConnect(zkaddr)
+ self.handle = None
- handle = None
- def _watch_session(self, handle, event_type, state, path):
- assert event_type == zookeeper.SESSION_EVENT
- assert not path
- if state == zookeeper.CONNECTED_STATE:
- if self.handle is None:
- self.handle = handle
- for watch in self.watches.clear():
- self._watch(watch, False)
- for path, data in self.ephemeral.items():
- zookeeper.create(self.handle, path, data['data'],
- data['acl'], data['flags'])
+ connected = self.connected = threading.Event()
+ def watch_session(handle, event_type, state, path):
+ assert event_type == zookeeper.SESSION_EVENT
+ assert not path
+ if state == zookeeper.CONNECTED_STATE:
+ if self.handle is None:
+ self.handle = handle
+ for watch in self.watches.clear():
+ self._watch(watch)
+ for path, data in self.ephemeral.items():
+ zookeeper.create(self.handle, path, data['data'],
+ data['acl'], data['flags'])
+ else:
+ assert handle == self.handle
+ connected.set()
+ logger.info('connected %s', handle)
+ elif state == zookeeper.CONNECTING_STATE:
+ connected.clear()
+ elif state == zookeeper.EXPIRED_SESSION_STATE:
+ connected.clear()
+ if self.handle is not None:
+ zookeeper.close(self.handle)
+ self.handle = None
+ init()
else:
- assert handle == self.handle
- self.connected.set()
- logger.info('connected %s', handle)
- elif state == zookeeper.CONNECTING_STATE:
- self.connected.clear()
- elif state == zookeeper.EXPIRED_SESSION_STATE:
- self.connected.clear()
- zookeeper.close(self.handle)
- self.handle = None
- zookeeper.init(self.zkaddr, self._watch_session)
+ logger.critical('unexpected session event %s %s', handle, state)
+
+ if session_timeout:
+ init = (lambda : zookeeper.init(connection_string, watch_session,
+ session_timeout)
+ )
else:
- logger.critical('unexpected session event %s %s', handle, state)
+ init = lambda : zookeeper.init(connection_string, watch_session)
+ handle = init()
+ connected.wait(1)
+ if not connected.is_set():
+ if wait:
+ while not connected.is_set():
+ print 'whimper'
+ logger.critical("Can't connect to ZooKeeper at %r",
+ connection_string)
+ connected.wait(1)
+ else:
+ zookeeper.close(handle)
+ raise FailedConnect(connection_string)
+
+
def register_server(self, path, addr, acl=READ_ACL_UNSAFE, **kw):
kw['pid'] = os.getpid()
if not isinstance(addr, str):
addr = '%s:%s' % addr
- self.connected.wait(self.timeout)
path = self.resolve(path)
zc.zk.event.notify(RegisteringServer(addr, path, kw))
self.create(path + '/' + addr, encode(kw), acl, zookeeper.EPHEMERAL)
@@ -220,10 +230,8 @@
if path in self.ephemeral:
self.ephemeral[path]['acl'] = acl
- def _watch(self, watch, wait=True):
+ def _watch(self, watch):
event_type = watch.event_type
- if wait:
- self.connected.wait(self.timeout)
watch.real_path = real_path = self.resolve(watch.path)
key = event_type, real_path
if self.watches.add(key, watch):
@@ -294,7 +302,7 @@
logger.exception("%s path went away", watch)
watch._deleted()
else:
- self._watch(watch, False)
+ self._watch(watch)
def children(self, path):
return Children(self, path)
@@ -312,7 +320,6 @@
self._import_tree(path, parse_tree(text), acl, trim, dry_run, True)
def _import_tree(self, path, node, acl, trim, dry_run, top=False):
- self.connected.wait(self.timeout)
if not top:
new_children = set(node.children)
for name in self.get_children(path):
@@ -390,7 +397,6 @@
def export_tree(self, path='/', ephemeral=False, name=None):
output = []
out = output.append
- self.connected.wait(self.timeout)
def export_tree(path, indent, name=None):
children = self.get_children(path)
@@ -455,7 +461,6 @@
raise zookeeper.NoNodeException(path)
def _set(self, path, data):
- self.connected.wait(self.timeout)
return self.set(path, data)
@@ -469,7 +474,7 @@
def close(self):
zookeeper.close(self.handle)
- del self.handle
+ self.handle = None
@property
def state(self):
Modified: zc.zk/trunk/src/zc/zk/testing.py
===================================================================
--- zc.zk/trunk/src/zc/zk/testing.py 2011-12-09 14:46:56 UTC (rev 123650)
+++ zc.zk/trunk/src/zc/zk/testing.py 2011-12-09 20:32:18 UTC (rev 123651)
@@ -112,6 +112,10 @@
shown in your tests. In particularm ``zookeeper.create`` returns
the path created and the string returned is real, not virtual.
This node is cleaned up by the ``tearDown``.
+
+ A doctest can determine if it's running with a stub ZooKeeper by
+ checking whether the value of the ZooKeeper gloval variable is None.
+ A regular unit test can check the ZooKeeper test attribute.
"""
globs = getattr(test, 'globs', test.__dict__)
@@ -126,10 +130,11 @@
orig_init = zookeeper.init
cm = mock.patch('zookeeper.init')
m = cm.__enter__()
- def init(addr, watch=None):
- assert_(addr==connection_string,
- "%r != %r" % (addr, connection_string))
- return orig_init(real_zk+test_root, watch, 1000)
+ def init(addr, watch=None, session_timeout=1000):
+ if addr != connection_string:
+ return orig_init(addr, watch, session_timeout)
+ else:
+ return orig_init(real_zk+test_root, watch, session_timeout)
m.side_effect = init
teardowns.append(cm.__exit__)
@@ -192,7 +197,7 @@
class Session:
- def __init__(self, zk, handle, watch=None):
+ def __init__(self, zk, handle, watch=None, session_timeout=None):
self.zk = zk
self.handle = handle
self.nodes = set()
@@ -200,6 +205,7 @@
self.remove = self.nodes.remove
self.watch = watch
self.state = zookeeper.CONNECTING_STATE
+ self.session_timeout = session_timeout
def connect(self):
self.newstate(zookeeper.CONNECTED_STATE)
@@ -254,22 +260,31 @@
class ZooKeeper:
def __init__(self, connection_string, tree):
- self.connection_string = connection_string
+ self.connection_strings = set([connection_string])
self.root = tree
self.sessions = {}
self.lock = threading.RLock()
- self.connect_immediately = True
+ self.failed = {}
- def init(self, addr, watch=None):
+ def init(self, addr, watch=None, session_timeout=4000):
with self.lock:
- assert_(addr==self.connection_string, addr)
handle = 0
while handle in self.sessions:
handle += 1
- self.sessions[handle] = Session(self, handle, watch)
- if self.connect_immediately:
+ self.sessions[handle] = Session(
+ self, handle, watch, session_timeout)
+ if addr in self.connection_strings:
self.sessions[handle].connect()
+ else:
+ self.failed.setdefault(addr, set()).add(handle)
+ return handle
+ def _allow_connection(self, connection_string):
+ self.connection_strings.add(connection_string)
+ for handle in self.failed.pop(connection_string, ()):
+ if handle in self.sessions:
+ self.sessions[handle].connect()
+
def _check_handle(self, handle, checkstate=True):
try:
session = self.sessions[handle]
@@ -419,6 +434,10 @@
return self._doasync(completion, handle, 2,
self.get, handle, path, watch)
+ def recv_timeout(self, handle):
+ with self.lock:
+ return self._check_handle(handle, False).session_timeout
+
def set(self, handle, path, data, version=-1, async=False):
with self.lock:
self._check_handle(handle)
Modified: zc.zk/trunk/src/zc/zk/tests.py
===================================================================
--- zc.zk/trunk/src/zc/zk/tests.py 2011-12-09 14:46:56 UTC (rev 123650)
+++ zc.zk/trunk/src/zc/zk/tests.py 2011-12-09 20:32:18 UTC (rev 123651)
@@ -1006,6 +1006,24 @@
True
"""
+def connection_edge_cases():
+ """
+We can pass a session timeout, and it will be passed to ZooKeeper:
+
+ >>> zk = zc.zk.ZooKeeper('zookeeper.example.com:2181', 4242)
+ >>> zk.recv_timeout()
+ 4242
+ >>> zk.close()
+
+Connecting to an invalid address caused a FailedConnect to be raised:
+
+ >>> zc.zk.ZooKeeper('192.0.2.42:2181')
+ Traceback (most recent call last):
+ ...
+ FailedConnect: 192.0.2.42:2181
+
+ """
+
event = threading.Event()
def check_async(show=True, expected_status=0):
event.clear()
@@ -1047,6 +1065,7 @@
suite.addTest(unittest.makeSuite(LoggingTests))
suite.addTest(doctest.DocFileSuite(
'ephemeral_node_recovery_on_session_reestablishment.test',
+ 'wait_for_zookeeper.test',
setUp=setUpEphemeral_node_recovery_on_session_reestablishment,
tearDown=zc.zk.testing.tearDown,
checker=checker,
Added: zc.zk/trunk/src/zc/zk/wait_for_zookeeper.test
===================================================================
--- zc.zk/trunk/src/zc/zk/wait_for_zookeeper.test (rev 0)
+++ zc.zk/trunk/src/zc/zk/wait_for_zookeeper.test 2011-12-09 20:32:18 UTC (rev 123651)
@@ -0,0 +1,41 @@
+Normally, zc.zk.ZooKeeper raises an exception if it can't connect to
+ZooKeeper in a second. Some applications might want to wait, so
+zc.zk.ZooKeeper accepts a wait parameter that causes it to wait for a
+connection.
+
+ >>> zk = None
+ >>> import zc.zk, zc.thread, zookeeper, zope.testing.loggingsupport
+
+ >>> handler = zope.testing.loggingsupport.InstalledHandler('zc.zk')
+
+ >>> @zc.thread.Thread
+ ... def connect():
+ ... global zk
+ ... zk = zc.zk.ZooKeeper('Invalid', wait=True)
+
+We'll wait a while while it tries in vane to connect:
+
+ >>> wait_until((lambda : zk is not None), 4)
+ Traceback (most recent call last):
+ ...
+ AssertionError: timeout
+
+ >>> print handler # doctest: +ELLIPSIS
+ zc.zk CRITICAL
+ Can't connect to ZooKeeper at 'Invalid'
+ zc.zk CRITICAL
+ Can't connect to ZooKeeper at 'Invalid'
+ ...
+ >>> handler.uninstall()
+
+Now, we'll make the connection possible:
+
+ >>> ZooKeeper._allow_connection('Invalid')
+ >>> wait_until(lambda : zk is not None)
+
+ >>> zk.state == zookeeper.CONNECTED_STATE
+ True
+
+Yay!
+
+ >>> zk.close()
Property changes on: zc.zk/trunk/src/zc/zk/wait_for_zookeeper.test
___________________________________________________________________
Added: svn:eol-style
+ native
More information about the checkins
mailing list